huskies: merge 906
This commit is contained in:
@@ -44,12 +44,12 @@ pub use read::{
|
|||||||
CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt,
|
CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt,
|
||||||
dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, read_item,
|
dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, read_item,
|
||||||
};
|
};
|
||||||
pub use state::init;
|
pub use state::{init, subscribe};
|
||||||
pub use types::{
|
pub use types::{
|
||||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
|
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
|
||||||
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
|
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
|
||||||
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage,
|
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage,
|
||||||
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, subscribe,
|
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
|
||||||
};
|
};
|
||||||
pub use write::{
|
pub use write::{
|
||||||
bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id,
|
bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id,
|
||||||
|
|||||||
@@ -29,6 +29,13 @@ mod tests;
|
|||||||
|
|
||||||
pub use init::init;
|
pub use init::init;
|
||||||
|
|
||||||
|
/// Subscribe to CRDT state-transition events.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the CRDT layer has not been initialised yet.
|
||||||
|
pub fn subscribe() -> Option<broadcast::Receiver<super::types::CrdtEvent>> {
|
||||||
|
statics::CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) use apply::{apply_and_persist, emit_event};
|
pub(super) use apply::{apply_and_persist, emit_event};
|
||||||
pub(super) use indices::{
|
pub(super) use indices::{
|
||||||
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
||||||
|
|||||||
@@ -94,6 +94,59 @@ fn rebuild_index_maps_story_ids() {
|
|||||||
assert!(index.contains_key("20_story_b"));
|
assert!(index.contains_key("20_story_b"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Regression for story 906: `subscribe()` and `emit_event()` must share the
|
||||||
|
/// same broadcast channel. Previously they referenced two separate
|
||||||
|
/// `CRDT_EVENT_TX` statics (one in `types.rs`, one in `state/statics.rs`),
|
||||||
|
/// so events fired into one channel and subscribers listened on the other —
|
||||||
|
/// no Matrix stage-transition notifications fired post-CRDT migration.
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn subscribe_receives_stage_transition_events() {
|
||||||
|
use super::super::types::CrdtEvent;
|
||||||
|
use super::super::write::write_item;
|
||||||
|
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
|
let mut rx = super::subscribe().expect("subscribe must return Some after init_for_test");
|
||||||
|
|
||||||
|
// Insert a new item — emit_event fires with from_stage=None.
|
||||||
|
write_item(
|
||||||
|
"906_story_subscribe",
|
||||||
|
"1_backlog",
|
||||||
|
Some("Subscribe Wiring"),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert");
|
||||||
|
assert_eq!(evt.story_id, "906_story_subscribe");
|
||||||
|
assert!(evt.from_stage.is_none());
|
||||||
|
assert_eq!(evt.to_stage, "1_backlog");
|
||||||
|
|
||||||
|
// Update stage — emit_event fires again with the real from_stage.
|
||||||
|
write_item(
|
||||||
|
"906_story_subscribe",
|
||||||
|
"2_current",
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change");
|
||||||
|
assert_eq!(evt.story_id, "906_story_subscribe");
|
||||||
|
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
|
||||||
|
assert_eq!(evt.to_stage, "2_current");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn init_and_write_read_roundtrip() {
|
async fn init_and_write_read_roundtrip() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
use bft_json_crdt::json_crdt::*;
|
use bft_json_crdt::json_crdt::*;
|
||||||
use bft_json_crdt::list_crdt::ListCrdt;
|
use bft_json_crdt::list_crdt::ListCrdt;
|
||||||
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
||||||
use std::sync::OnceLock;
|
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
/// An event emitted when a pipeline item's stage changes in the CRDT document.
|
/// An event emitted when a pipeline item's stage changes in the CRDT document.
|
||||||
@@ -20,15 +19,6 @@ pub struct CrdtEvent {
|
|||||||
pub name: Option<String>,
|
pub name: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe to CRDT state transition events.
|
|
||||||
///
|
|
||||||
/// Returns `None` if the CRDT layer has not been initialised yet.
|
|
||||||
pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
|
|
||||||
CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
|
|
||||||
}
|
|
||||||
|
|
||||||
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
|
||||||
|
|
||||||
// ── CRDT document types ──────────────────────────────────────────────
|
// ── CRDT document types ──────────────────────────────────────────────
|
||||||
|
|
||||||
/// CRDT sub-document holding gateway-level configuration.
|
/// CRDT sub-document holding gateway-level configuration.
|
||||||
|
|||||||
Reference in New Issue
Block a user