huskies: merge 643_story_web_ui_consumer_for_the_unified_status_broadcaster
This commit is contained in:
@@ -43,6 +43,12 @@ pub struct ProjectConfig {
|
||||
/// Default: `true`.
|
||||
#[serde(default = "default_rate_limit_notifications")]
|
||||
pub rate_limit_notifications: bool,
|
||||
/// Whether the web UI WebSocket consumer subscribes to the status broadcaster.
|
||||
/// Set to `false` to disable status event forwarding to the web UI without
|
||||
/// affecting other consumers (chat transports, agent context).
|
||||
/// Default: `true`.
|
||||
#[serde(default = "default_web_ui_status_consumer")]
|
||||
pub web_ui_status_consumer: bool,
|
||||
/// IANA timezone name (e.g. `"Europe/London"`, `"America/New_York"`).
|
||||
/// When set, timer HH:MM inputs are interpreted in this timezone instead
|
||||
/// of the container/host local time. Falls back to `chrono::Local` when absent.
|
||||
@@ -123,6 +129,10 @@ fn default_rate_limit_notifications() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn default_web_ui_status_consumer() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn default_max_mesh_peers() -> usize {
|
||||
3
|
||||
}
|
||||
@@ -252,6 +262,7 @@ impl Default for ProjectConfig {
|
||||
max_retries: default_max_retries(),
|
||||
base_branch: None,
|
||||
rate_limit_notifications: default_rate_limit_notifications(),
|
||||
web_ui_status_consumer: default_web_ui_status_consumer(),
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -333,6 +344,7 @@ impl ProjectConfig {
|
||||
max_retries: legacy.max_retries,
|
||||
base_branch: legacy.base_branch,
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
web_ui_status_consumer: default_web_ui_status_consumer(),
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -365,6 +377,7 @@ impl ProjectConfig {
|
||||
max_retries: legacy.max_retries,
|
||||
base_branch: legacy.base_branch,
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
web_ui_status_consumer: default_web_ui_status_consumer(),
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -385,6 +398,7 @@ impl ProjectConfig {
|
||||
max_retries: legacy.max_retries,
|
||||
base_branch: legacy.base_branch,
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
web_ui_status_consumer: default_web_ui_status_consumer(),
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! WebSocket transport adapter — accept connection, serialise/deserialise frames,
|
||||
//! invoke service methods. No business logic, no inline state transitions.
|
||||
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::http::context::AppContext;
|
||||
use crate::llm::chat;
|
||||
use crate::service::ws::{self, WsResponse};
|
||||
@@ -56,6 +57,18 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
||||
ws::subscribe_watcher(tx.clone(), ctx.clone(), ctx.watcher_tx.subscribe());
|
||||
ws::subscribe_reconciliation(tx.clone(), ctx.reconciliation_tx.subscribe());
|
||||
|
||||
// Subscribe to the status broadcaster if web UI consumer is enabled (default: true).
|
||||
let status_enabled = ctx
|
||||
.state
|
||||
.get_project_root()
|
||||
.ok()
|
||||
.and_then(|root| ProjectConfig::load(&root).ok())
|
||||
.map(|c| c.web_ui_status_consumer)
|
||||
.unwrap_or(true);
|
||||
if status_enabled {
|
||||
ws::subscribe_status(tx.clone(), ctx.services.status.subscribe());
|
||||
}
|
||||
|
||||
// Map of pending permission request_id -> oneshot responder.
|
||||
let mut pending_perms: HashMap<String, oneshot::Sender<PermissionDecision>> =
|
||||
HashMap::new();
|
||||
@@ -230,6 +243,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use crate::service::status::StatusEvent;
|
||||
|
||||
// ── ws_handler integration tests (real WebSocket connection) ─────
|
||||
|
||||
@@ -534,4 +548,158 @@ mod tests {
|
||||
let (_sink2, _stream2, initial2) = connect_ws(&url).await;
|
||||
assert_eq!(initial2["type"], "pipeline_state");
|
||||
}
|
||||
|
||||
/// Read the next `status_update` whose story_id or story_name contains `needle`,
|
||||
/// within a timeout. Skips `log_entry` noise and unrelated status events so
|
||||
/// genuine server log noise cannot cause false positives or negatives.
|
||||
async fn next_status_update_containing(
|
||||
stream: &mut futures::stream::SplitStream<
|
||||
tokio_tungstenite::WebSocketStream<
|
||||
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
>,
|
||||
needle: &str,
|
||||
timeout_ms: u64,
|
||||
) -> Option<serde_json::Value> {
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
|
||||
loop {
|
||||
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
|
||||
if remaining.is_zero() {
|
||||
return None;
|
||||
}
|
||||
let msg = tokio::time::timeout(remaining, stream.next())
|
||||
.await
|
||||
.ok()?
|
||||
.expect("stream ended")
|
||||
.expect("ws error");
|
||||
let val: serde_json::Value = match msg {
|
||||
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).ok()?,
|
||||
_ => continue,
|
||||
};
|
||||
if val["type"] == "status_update" {
|
||||
let event = &val["event"];
|
||||
let story_id = event["story_id"].as_str().unwrap_or("");
|
||||
let story_name = event["story_name"].as_str().unwrap_or("");
|
||||
if story_id.contains(needle) || story_name.contains(needle) {
|
||||
return Some(val);
|
||||
}
|
||||
}
|
||||
// Skip log_entry and other unrelated messages.
|
||||
}
|
||||
}
|
||||
|
||||
// ── Status broadcaster integration tests ─────────────────────────
|
||||
|
||||
/// Publishing a status event via `services.status` must result in a
|
||||
/// `status_update` WebSocket message with structured fields delivered to the
|
||||
/// connected client.
|
||||
#[tokio::test]
|
||||
async fn ws_handler_forwards_status_events_as_status_update() {
|
||||
let (url, ctx) = start_test_server().await;
|
||||
let (_sink, mut stream, _initial) = connect_ws(&url).await;
|
||||
|
||||
// Use a story ID unique enough that genuine server logs won't match it.
|
||||
ctx.services.status.publish(StatusEvent::StageTransition {
|
||||
story_id: "77_story_status_fwd_test".to_string(),
|
||||
story_name: Some("StatusFwdTest".to_string()),
|
||||
from_stage: "1_backlog".to_string(),
|
||||
to_stage: "2_current".to_string(),
|
||||
});
|
||||
|
||||
// The handler must forward it as a status_update with structured fields.
|
||||
let msg = next_status_update_containing(&mut stream, "StatusFwdTest", 2000)
|
||||
.await
|
||||
.expect("expected a status_update for the status event");
|
||||
assert_eq!(msg["type"], "status_update");
|
||||
let event = &msg["event"];
|
||||
assert_eq!(event["type"], "stage_transition");
|
||||
assert_eq!(event["story_id"], "77_story_status_fwd_test");
|
||||
assert_eq!(event["story_name"], "StatusFwdTest");
|
||||
assert_eq!(event["from_stage"], "1_backlog");
|
||||
assert_eq!(event["to_stage"], "2_current");
|
||||
}
|
||||
|
||||
/// Multi-project isolation: a client connected to project A's server must
|
||||
/// NOT receive status events published on project B's broadcaster.
|
||||
#[tokio::test]
|
||||
async fn ws_handler_multi_project_status_isolation() {
|
||||
// Start two independent servers (each with its own AppContext / Services).
|
||||
let (url_a, ctx_a) = start_test_server().await;
|
||||
let (url_b, _ctx_b) = start_test_server().await;
|
||||
|
||||
let (_sink_a, mut stream_a, _) = connect_ws(&url_a).await;
|
||||
let (_sink_b, mut stream_b, _) = connect_ws(&url_b).await;
|
||||
|
||||
// Use a needle unique enough that genuine server logs won't match.
|
||||
let needle = "ProjAIsolation7734";
|
||||
ctx_a.services.status.publish(StatusEvent::MergeFailure {
|
||||
story_id: "10_story_proj_a_isolation".to_string(),
|
||||
story_name: Some(needle.to_string()),
|
||||
reason: "conflict".to_string(),
|
||||
});
|
||||
|
||||
// Client A must receive the status_update with structured fields.
|
||||
let msg_a = next_status_update_containing(&mut stream_a, needle, 2000)
|
||||
.await
|
||||
.expect("client A should receive the status event");
|
||||
assert_eq!(msg_a["type"], "status_update");
|
||||
assert_eq!(msg_a["event"]["story_name"], needle);
|
||||
|
||||
// Client B must NOT receive any status_update containing the needle.
|
||||
let msg_b = next_status_update_containing(&mut stream_b, needle, 300).await;
|
||||
assert!(
|
||||
msg_b.is_none(),
|
||||
"client B must not receive project A's status event, got: {msg_b:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// When `web_ui_status_consumer = false` in project.toml, the WebSocket
|
||||
/// handler must not forward status events to the connected client.
|
||||
#[tokio::test]
|
||||
async fn ws_handler_status_consumer_disabled_via_config() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path().to_path_buf();
|
||||
|
||||
// Write a project.toml that disables the web UI status consumer.
|
||||
let huskies_dir = root.join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir).unwrap();
|
||||
std::fs::write(
|
||||
huskies_dir.join("project.toml"),
|
||||
"web_ui_status_consumer = false\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
crate::db::ensure_content_store();
|
||||
let ctx = Arc::new(AppContext::new_test(root));
|
||||
let ctx_data = ctx.clone();
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let app = poem::Route::new()
|
||||
.at("/ws", poem::get(ws_handler))
|
||||
.data(ctx_data);
|
||||
tokio::spawn(async move {
|
||||
let acceptor = poem::listener::TcpAcceptor::from_tokio(listener).unwrap();
|
||||
let _ = poem::Server::new_with_acceptor(acceptor).run(app).await;
|
||||
});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
let url = format!("ws://127.0.0.1:{}/ws", addr.port());
|
||||
let (_sink, mut stream, _) = connect_ws(&url).await;
|
||||
|
||||
// Use a unique needle — genuine server logs will never contain this.
|
||||
let needle = "DisabledConsumer9182";
|
||||
ctx.services.status.publish(StatusEvent::StoryBlocked {
|
||||
story_id: "55_story_disabled_consumer".to_string(),
|
||||
story_name: Some(needle.to_string()),
|
||||
reason: "test".to_string(),
|
||||
});
|
||||
|
||||
// Consumer is disabled — no status_update with this needle should arrive.
|
||||
let msg = next_status_update_containing(&mut stream, needle, 500).await;
|
||||
assert!(
|
||||
msg.is_none(),
|
||||
"disabled consumer must not forward status events, got: {msg:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,12 @@ use crate::service::notifications::format::stage_display_name;
|
||||
use crate::service::status::StatusEvent;
|
||||
|
||||
/// Render a [`StatusEvent`] into a human-readable plain-text string.
|
||||
#[allow(dead_code)]
|
||||
///
|
||||
/// This is the single formatter for all status event types. Every transport
|
||||
/// (chat, Web UI, agent context) calls this function rather than duplicating
|
||||
/// formatting logic.
|
||||
// Used by chat/agent transports (stories 642/644); the web UI uses StatusUpdate frames instead.
|
||||
#[allow(dead_code)]
|
||||
pub fn format_status_event(event: &StatusEvent) -> String {
|
||||
match event {
|
||||
StatusEvent::StageTransition {
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
pub mod format;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
@@ -44,7 +45,8 @@ const CHANNEL_CAPACITY: usize = 256;
|
||||
///
|
||||
/// Each variant carries enough context for [`format_status_event`] to render a
|
||||
/// human-readable message without additional lookups.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum StatusEvent {
|
||||
/// A work item moved between pipeline stages.
|
||||
StageTransition {
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::io::onboarding;
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use crate::io::wizard;
|
||||
use crate::log_buffer;
|
||||
use crate::service::status::Subscription;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
@@ -116,6 +117,21 @@ pub fn subscribe_watcher(
|
||||
});
|
||||
}
|
||||
|
||||
/// Spawn a background task that forwards status broadcaster events to the client.
|
||||
///
|
||||
/// Each [`StatusEvent`](crate::service::status::StatusEvent) is delivered as a
|
||||
/// [`WsResponse::StatusUpdate`] with the structured event fields intact, so the
|
||||
/// frontend can do per-type presentation without parsing strings.
|
||||
pub fn subscribe_status(tx: mpsc::UnboundedSender<WsResponse>, mut subscription: Subscription) {
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = subscription.recv().await {
|
||||
if tx.send(WsResponse::StatusUpdate { event }).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Spawn a background task that forwards reconciliation events to the client.
|
||||
pub fn subscribe_reconciliation(
|
||||
tx: mpsc::UnboundedSender<WsResponse>,
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::http::workflow::{PipelineState, UpcomingStory};
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use crate::llm::chat;
|
||||
use crate::llm::types::Message;
|
||||
use crate::service::status::StatusEvent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// WebSocket request messages sent by the client.
|
||||
@@ -153,6 +154,15 @@ pub enum WsResponse {
|
||||
level: String,
|
||||
message: String,
|
||||
},
|
||||
/// A structured pipeline status event forwarded from the status broadcaster.
|
||||
///
|
||||
/// The structured [`StatusEvent`] fields are preserved on the wire so
|
||||
/// frontend consumers can do per-type presentation without parsing strings.
|
||||
/// This frame intentionally does NOT call `format_status_event` — that
|
||||
/// formatter is reserved for chat transports (story 644).
|
||||
StatusUpdate {
|
||||
event: StatusEvent,
|
||||
},
|
||||
}
|
||||
|
||||
// ── Domain event conversions ────────────────────────────────────────────────
|
||||
|
||||
@@ -20,6 +20,6 @@ pub use dispatch::{
|
||||
};
|
||||
pub use io::{
|
||||
check_onboarding, load_initial_pipeline_state, load_recent_logs, load_wizard_state,
|
||||
subscribe_logs, subscribe_reconciliation, subscribe_watcher,
|
||||
subscribe_logs, subscribe_reconciliation, subscribe_status, subscribe_watcher,
|
||||
};
|
||||
pub use message::{WizardStepInfo, WsResponse};
|
||||
|
||||
@@ -528,6 +528,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -557,6 +558,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -586,6 +588,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -615,6 +618,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -643,6 +647,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -678,6 +683,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -754,6 +760,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -788,6 +795,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -869,6 +877,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -906,6 +915,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -933,6 +943,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
@@ -966,6 +977,7 @@ mod tests {
|
||||
max_retries: 2,
|
||||
base_branch: None,
|
||||
rate_limit_notifications: true,
|
||||
web_ui_status_consumer: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
|
||||
Reference in New Issue
Block a user