huskies: merge 1136 story Sled → gateway WebSocket back-channel so project pipeline events reach Timmy
This commit is contained in:
@@ -271,4 +271,98 @@ mod tests {
|
|||||||
spawn_relay_task(String::new(), "test".into(), broadcaster, client);
|
spawn_relay_task(String::new(), "test".into(), broadcaster, client);
|
||||||
// If we reach here without panic, the guard worked.
|
// If we reach here without panic, the guard worked.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// End-to-end: a `TransitionFired`-equivalent event published on the sled's
|
||||||
|
/// broadcaster must reach the gateway's [`GatewayStatusEvent`] broadcast
|
||||||
|
/// within 1 second.
|
||||||
|
///
|
||||||
|
/// Spins up a real poem HTTP server (token endpoint + WS event-push endpoint),
|
||||||
|
/// spawns the relay task pointing at it, fires a [`StatusEvent::StageTransition`],
|
||||||
|
/// and asserts the gateway broadcast receives the matching [`StoredEvent`].
|
||||||
|
#[tokio::test]
|
||||||
|
async fn relay_end_to_end_stage_transition_reaches_gateway_broadcast() {
|
||||||
|
use crate::http::gateway::{gateway_event_push_handler, gateway_generate_token_handler};
|
||||||
|
use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry};
|
||||||
|
use poem::EndpointExt as _;
|
||||||
|
use poem::listener::TcpAcceptor;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
|
// Gateway state: one project whose name matches the relay project name.
|
||||||
|
let mut projects = BTreeMap::new();
|
||||||
|
projects.insert(
|
||||||
|
"sled-test".to_string(),
|
||||||
|
ProjectEntry::with_url("http://sled-test:3001"),
|
||||||
|
);
|
||||||
|
let config = GatewayConfig {
|
||||||
|
projects,
|
||||||
|
sled_tokens: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
let state = Arc::new(GatewayState::new(config, PathBuf::new(), 9000).unwrap());
|
||||||
|
|
||||||
|
// Subscribe before the relay connects so the event is not missed.
|
||||||
|
let mut gw_rx = state.event_tx.subscribe();
|
||||||
|
|
||||||
|
// Start a poem server on an ephemeral loopback port exposing the real
|
||||||
|
// token and event-push handlers.
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
let gateway_url = format!("http://127.0.0.1:{}", addr.port());
|
||||||
|
|
||||||
|
let route = poem::Route::new()
|
||||||
|
.at(
|
||||||
|
"/gateway/tokens",
|
||||||
|
poem::post(gateway_generate_token_handler),
|
||||||
|
)
|
||||||
|
.at(
|
||||||
|
"/gateway/events/push",
|
||||||
|
poem::get(gateway_event_push_handler),
|
||||||
|
)
|
||||||
|
.data(state.clone());
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let acceptor = TcpAcceptor::from_tokio(listener).unwrap();
|
||||||
|
let _ = poem::Server::new_with_acceptor(acceptor).run(route).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Spawn the relay task pointing at our in-process gateway server.
|
||||||
|
let broadcaster = Arc::new(StatusBroadcaster::new());
|
||||||
|
spawn_relay_task(
|
||||||
|
gateway_url,
|
||||||
|
"sled-test".into(),
|
||||||
|
Arc::clone(&broadcaster),
|
||||||
|
reqwest::Client::new(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Give the relay time to obtain a join token, connect the WebSocket,
|
||||||
|
// and enter its event-receive loop before we publish.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||||
|
|
||||||
|
// Publish a stage transition on the sled side.
|
||||||
|
broadcaster.publish(StatusEvent::StageTransition {
|
||||||
|
story_id: "42_story_relay_e2e".into(),
|
||||||
|
story_name: "Relay E2E".into(),
|
||||||
|
from_stage: "1_backlog".into(),
|
||||||
|
to_stage: "2_current".into(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// The event must arrive at the gateway broadcast within 1 second.
|
||||||
|
let received = tokio::time::timeout(std::time::Duration::from_secs(1), gw_rx.recv())
|
||||||
|
.await
|
||||||
|
.expect("timed out: event did not arrive at gateway broadcast within 1 s")
|
||||||
|
.expect("gateway broadcast channel closed unexpectedly");
|
||||||
|
|
||||||
|
assert_eq!(received.project, "sled-test");
|
||||||
|
assert!(
|
||||||
|
matches!(
|
||||||
|
received.event,
|
||||||
|
StoredEvent::StageTransition { ref story_id, .. } if story_id == "42_story_relay_e2e"
|
||||||
|
),
|
||||||
|
"unexpected gateway event: {:?}",
|
||||||
|
received.event
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user