huskies: merge 995

This commit is contained in:
dave
2026-05-14 07:51:16 +00:00
parent 52180bc402
commit 4520e0e6f9
8 changed files with 423 additions and 319 deletions
+7 -13
View File
@@ -9,8 +9,6 @@ use crate::io::watcher::WatcherEvent;
/// The notification action to take in response to a [`WatcherEvent`].
#[derive(Debug, PartialEq)]
pub enum EventAction {
/// Post a stage-transition notification; the event carries a known source stage.
StageTransition,
/// Post a merge-failure error notification.
MergeFailure,
/// Post a rate-limit warning (subject to config/debounce suppression).
@@ -39,15 +37,9 @@ pub enum EventAction {
/// Classify a [`WatcherEvent`] into the action the notification listener should take.
pub fn classify(event: &WatcherEvent) -> EventAction {
match event {
WatcherEvent::WorkItem { from_stage, .. } => {
if from_stage.is_some() {
EventAction::StageTransition
} else {
// Synthetic events (creation, reassign) have no from_stage.
// Posting a notification for these would produce incorrect messages.
EventAction::Skip
}
}
// Stage-change notifications are now handled by the TransitionFired subscriber
// (story 995). WorkItem events are skipped regardless of from_stage.
WatcherEvent::WorkItem { .. } => EventAction::Skip,
WatcherEvent::MergeFailure { .. } => EventAction::MergeFailure,
WatcherEvent::RateLimitWarning { .. } => EventAction::RateLimitWarning,
WatcherEvent::StoryBlocked { .. } => EventAction::StoryBlocked,
@@ -77,10 +69,12 @@ mod tests {
}
}
// Stage-change notifications moved to TransitionFired subscriber (story 995).
// All WorkItem events are now classified as Skip regardless of from_stage.
#[test]
fn work_item_with_from_stage_is_stage_transition() {
fn work_item_with_from_stage_is_skip() {
let event = work_item(Some("2_current"));
assert_eq!(classify(&event), EventAction::StageTransition);
assert_eq!(classify(&event), EventAction::Skip);
}
#[test]
+6 -114
View File
@@ -4,7 +4,6 @@
use crate::chat::ChatTransport;
use crate::config::ProjectConfig;
use crate::io::watcher::WatcherEvent;
use crate::pipeline_state::Stage;
use crate::slog;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -13,14 +12,11 @@ use std::time::Instant;
use tokio::sync::broadcast;
use super::super::events::classify;
use super::super::filter::{
AGENT_EVENT_DEBOUNCE, STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit,
};
use super::super::filter::{AGENT_EVENT_DEBOUNCE, should_send_rate_limit};
use super::super::format::{
format_agent_completed_notification, format_agent_started_notification,
format_blocked_notification, format_error_notification, format_oauth_account_swapped,
format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification,
merge_failure_snippet,
format_oauth_accounts_exhausted, format_rate_limit_notification, merge_failure_snippet,
};
use super::super::route::rooms_for_notification;
use super::{find_story_name_any_stage, read_story_name};
@@ -47,14 +43,6 @@ pub fn spawn_notification_listener(
// "story_id:agent_name" key, to debounce repeated warnings.
let mut rate_limit_last_notified: HashMap<String, Instant> = HashMap::new();
// Pending stage-transition notifications, keyed by item_id.
// Value: (from_stage, to_stage, story_name).
// Rapid successive transitions for the same item are coalesced: the
// original `from_stage` is kept while `to_stage` is updated to the
// latest destination, so only one notification fires for the final stage.
let mut pending_transitions: HashMap<String, (Stage, Stage, String)> = HashMap::new();
let mut flush_deadline: Option<tokio::time::Instant> = None;
// Pending agent-status notifications, keyed by "{story_id}:{event_kind}".
// Value: (plain, html). Rapid successive events for the same story and
// event kind are coalesced: only the latest is sent after the debounce
@@ -63,48 +51,17 @@ pub fn spawn_notification_listener(
let mut agent_flush_deadline: Option<tokio::time::Instant> = None;
loop {
// Pick the earliest of the two debounce deadlines.
let earliest_deadline = match (flush_deadline, agent_flush_deadline) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
// Wait for the next event, or flush pending notifications when the
// earliest debounce window expires.
let recv_result = if let Some(deadline) = earliest_deadline {
// Wait for the next event, or flush pending agent notifications when
// the debounce window expires.
let recv_result = if let Some(deadline) = agent_flush_deadline {
tokio::time::timeout_at(deadline, rx.recv()).await.ok()
} else {
Some(rx.recv().await)
};
if recv_result.is_none() {
let now = tokio::time::Instant::now();
// Flush stage transitions if their deadline has passed.
if flush_deadline.is_some_and(|d| d <= now) {
for (item_id, (from_stage, to_stage, story_name)) in pending_transitions.drain()
{
let (plain, html) = format_stage_notification(
&item_id,
&story_name,
&from_stage,
&to_stage,
);
slog!("[bot] Sending stage notification: {plain}");
if config.status_push_enabled {
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!("[bot] Failed to send notification to {room_id}: {e}");
}
}
}
}
flush_deadline = None;
}
// Flush agent events if their deadline has passed.
if agent_flush_deadline.is_some_and(|d| d <= now) {
if agent_flush_deadline.is_some_and(|d| d <= tokio::time::Instant::now()) {
for (_key, (plain, html)) in pending_agent_events.drain() {
slog!("[bot] Sending agent notification: {plain}");
if config.status_push_enabled {
@@ -131,25 +88,7 @@ pub fn spawn_notification_listener(
}
Err(broadcast::error::RecvError::Closed) => {
slog!("[bot] Watcher channel closed, stopping notification listener");
// Flush any coalesced transitions that haven't fired yet.
if config.status_push_enabled {
for (item_id, (from_stage, to_stage, story_name)) in
pending_transitions.drain()
{
let (plain, html) = format_stage_notification(
&item_id,
&story_name,
&from_stage,
&to_stage,
);
slog!("[bot] Sending stage notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!("[bot] Failed to send notification to {room_id}: {e}");
}
}
}
for (_key, (plain, html)) in pending_agent_events.drain() {
slog!("[bot] Sending agent notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) {
@@ -168,53 +107,6 @@ pub fn spawn_notification_listener(
use super::super::events::EventAction;
match classify(&event) {
EventAction::StageTransition => {
if !config.status_push_enabled {
continue;
}
// WorkItem with a known from_stage — extract the fields.
let WatcherEvent::WorkItem {
ref stage,
ref item_id,
ref from_stage,
..
} = event
else {
continue;
};
let from_typed = from_stage
.as_deref()
.and_then(Stage::from_dir)
.unwrap_or(Stage::Upcoming);
let to_typed = Stage::from_dir(stage).unwrap_or(Stage::Upcoming);
// Look up the story name in the expected stage directory; fall
// back to a full search so stale events still show the name.
let story_name = {
let n = read_story_name(&project_root, stage, item_id);
if n.is_empty() {
find_story_name_any_stage(&project_root, item_id)
} else {
n
}
};
// Buffer the transition. If this item_id is already pending (rapid
// succession), update the destination stage to the latest while
// preserving the original from_stage.
pending_transitions
.entry(item_id.clone())
.and_modify(|e| {
e.1 = to_typed.clone();
if !story_name.is_empty() {
e.2 = story_name.clone();
}
})
.or_insert_with(|| (from_typed, to_typed, story_name));
// Start or extend the debounce window.
flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE);
}
EventAction::MergeFailure => {
if !config.status_push_enabled {
continue;
@@ -9,12 +9,18 @@ use std::path::Path;
mod listener;
pub use listener::spawn_notification_listener;
/// Subscriber that fires stage-change notifications via the pipeline event bus.
pub(super) mod stage_subscriber;
pub use stage_subscriber::spawn_stage_notification_subscriber;
#[cfg(test)]
mod mock_transport;
#[cfg(test)]
mod tests_notifications;
#[cfg(test)]
mod tests_stage;
#[cfg(test)]
mod tests_transition;
/// Read the story name from the typed CRDT register.
///
@@ -0,0 +1,119 @@
//! Stage-transition notification subscriber.
//!
//! Subscribes to the [`TransitionFired`] broadcast channel and dispatches
//! chat notifications for each significant pipeline stage change.
//! This is the **single** emitter of stage-change notifications; all other
//! call sites have been removed (story 995).
use crate::chat::ChatTransport;
use crate::config::ProjectConfig;
use crate::pipeline_state::Stage;
use crate::service::notifications::filter::STAGE_TRANSITION_DEBOUNCE;
use crate::service::notifications::format::format_stage_notification;
use crate::service::notifications::route::rooms_for_notification;
use crate::slog;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
/// Spawn a background task that subscribes to typed [`TransitionFired`] events
/// and posts stage-change notifications to all configured chat rooms.
///
/// Coalesces rapid successive transitions for the same item into a single
/// notification (200 ms debounce window) so only the final stage is announced.
/// Config is read once at startup; `status_push_enabled = false` suppresses all
/// notifications from this subscriber.
pub fn spawn_stage_notification_subscriber(
transport: Arc<dyn ChatTransport>,
get_room_ids: impl Fn() -> Vec<String> + Send + Sync + 'static,
project_root: PathBuf,
) {
let mut rx = crate::pipeline_state::subscribe_transitions();
tokio::spawn(async move {
let config = ProjectConfig::load(&project_root).unwrap_or_default();
// Pending notifications keyed by story_id: (from_stage, to_stage, story_name).
// Rapid transitions are coalesced: original from preserved, to updated.
let mut pending: HashMap<String, (Stage, Stage, String)> = HashMap::new();
let mut flush_deadline: Option<tokio::time::Instant> = None;
loop {
let recv_result = if let Some(deadline) = flush_deadline {
tokio::time::timeout_at(deadline, rx.recv()).await.ok()
} else {
Some(rx.recv().await)
};
// Timeout → flush coalesced notifications.
if recv_result.is_none() {
if flush_deadline.is_some_and(|d| d <= tokio::time::Instant::now()) {
send_pending(&pending, &transport, &get_room_ids).await;
pending.clear();
flush_deadline = None;
}
continue;
}
let fired = match recv_result.unwrap() {
Ok(f) => f,
Err(broadcast::error::RecvError::Lagged(n)) => {
slog!("[bot/transition] Subscriber lagged; skipped {n} event(s)");
continue;
}
Err(broadcast::error::RecvError::Closed) => {
send_pending(&pending, &transport, &get_room_ids).await;
break;
}
};
if !config.status_push_enabled {
continue;
}
// Upcoming is the only stage we suppress; every other stage
// arrival produces a notification.
if matches!(&fired.after, Stage::Upcoming) {
continue;
}
let story_name =
super::read_story_name(&project_root, fired.after.dir_name(), &fired.story_id.0);
// Coalesce: keep original from_stage, update to_stage and name.
pending
.entry(fired.story_id.0.clone())
.and_modify(|e| {
e.1 = fired.after.clone();
if !story_name.is_empty() {
e.2 = story_name.clone();
}
})
.or_insert_with(|| (fired.before.clone(), fired.after.clone(), story_name));
// Set the deadline once from the first arriving event so that
// concurrent test broadcasts on the global channel do not keep
// pushing the window out and starving the flush.
if flush_deadline.is_none() {
flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE);
}
}
});
}
/// Send all pending stage-transition notifications and log each one.
async fn send_pending(
pending: &HashMap<String, (Stage, Stage, String)>,
transport: &Arc<dyn ChatTransport>,
get_room_ids: &(impl Fn() -> Vec<String> + Sync),
) {
for (item_id, (from_stage, to_stage, story_name)) in pending {
let (plain, html) = format_stage_notification(item_id, story_name, from_stage, to_stage);
slog!("[bot/transition] Sending stage notification: {plain}");
for room_id in &rooms_for_notification(get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!("[bot/transition] Failed to send notification to {room_id}: {e}");
}
}
}
}
@@ -1,106 +1,9 @@
//! Tests for stage-transition notifications and `read_story_name`.
//! Tests for `read_story_name`.
//!
//! Stage-transition notification tests have moved to `tests_transition.rs`
//! which uses `spawn_stage_notification_subscriber` and real CRDT transitions.
use super::mock_transport::MockTransport;
use super::{read_story_name, spawn_notification_listener};
use crate::io::watcher::WatcherEvent;
use std::sync::Arc;
use tokio::sync::broadcast;
// ── dynamic room IDs (WhatsApp ambient_rooms pattern) ───────────────────────
/// Notifications are sent to the rooms returned by the closure at
/// notification time, not at listener-spawn time. This verifies that a
/// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers
/// messages to the rooms present when the event fires.
#[tokio::test]
async fn stage_notification_uses_dynamic_room_ids() {
let tmp = tempfile::tempdir().unwrap();
// Seed story via CRDT (the only source of truth).
crate::db::ensure_content_store();
crate::db::write_item_with_content(
"10_story_foo",
"3_qa",
"---\nname: Foo Story\n---\n",
crate::db::ItemMeta::named("Foo Story"),
);
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
let rooms: Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()));
let rooms_for_closure = Arc::clone(&rooms);
spawn_notification_listener(
transport,
move || rooms_for_closure.lock().unwrap().iter().cloned().collect(),
watcher_rx,
tmp.path().to_path_buf(),
);
// Add a room after the listener is spawned (simulates a user messaging first).
rooms
.lock()
.unwrap()
.insert("phone:+15551234567".to_string());
watcher_tx
.send(WatcherEvent::WorkItem {
stage: "qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "huskies: qa 10_story_foo".to_string(),
from_stage: Some("coding".to_string()),
})
.unwrap();
// Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced
// notification flushes.
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
assert_eq!(
calls.len(),
1,
"Should deliver to the dynamically added room"
);
assert_eq!(calls[0].0, "phone:+15551234567");
assert!(
calls[0].1.contains("10"),
"plain should contain story number"
);
assert!(
calls[0].1.contains("Foo Story"),
"plain should contain story name"
);
}
/// When no rooms are registered (e.g. no WhatsApp users have messaged yet),
/// no notifications are sent and the listener does not panic.
#[tokio::test]
async fn stage_notification_with_no_rooms_is_silent() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf());
watcher_tx
.send(WatcherEvent::WorkItem {
stage: "qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "huskies: qa 10_story_foo".to_string(),
from_stage: Some("coding".to_string()),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 0, "No rooms means no notifications");
}
use super::read_story_name;
// ── read_story_name ──────────────────────────────────────────────────────────
@@ -141,93 +44,3 @@ fn read_story_name_returns_none_for_missing_name_field() {
let name = read_story_name(tmp.path(), "2_current", "9943_story_no_name");
assert!(name.is_empty());
}
// ── Bug 549: synthetic events with from_stage=None must not notify ───────────
/// Synthetic events (reassign, creation) have from_stage=None and must
/// not produce stage-transition notifications. Before the fix, the
/// inferred_from_stage fallback would emit e.g. "QA → Merge" for a
/// reassign event within the merge stage.
#[tokio::test]
async fn synthetic_event_without_from_stage_does_not_notify() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
// Synthetic reassign event within 4_merge — no actual stage change.
watcher_tx
.send(WatcherEvent::WorkItem {
stage: "merge".to_string(),
item_id: "549_story_skip_qa".to_string(),
action: "reassign".to_string(),
commit_msg: String::new(),
from_stage: None,
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
assert_eq!(
calls.len(),
0,
"Synthetic events with from_stage=None must not generate notifications"
);
}
/// Stories that skip QA (qa: server) move directly from Current to Merge.
/// The notification must say "Current → Merge", not "QA → Merge".
#[tokio::test]
async fn skip_qa_shows_current_to_merge_not_qa_to_merge() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".huskies").join("work").join("4_merge");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("549_story_skip_qa.md"),
"---\nname: Skip QA Story\n---\n",
)
.unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
// Story skips QA: from_stage is 2_current, not 3_qa.
watcher_tx
.send(WatcherEvent::WorkItem {
stage: "merge".to_string(),
item_id: "549_story_skip_qa".to_string(),
action: "merge".to_string(),
commit_msg: "huskies: merge 549_story_skip_qa".to_string(),
from_stage: Some("coding".to_string()),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Should send exactly one notification");
assert!(
calls[0].1.contains("Current \u{2192} Merge"),
"Notification should say 'Current → Merge', got: {}",
calls[0].1
);
assert!(
!calls[0].1.contains("QA \u{2192} Merge"),
"Must NOT say 'QA → Merge' when QA was skipped"
);
}
@@ -0,0 +1,255 @@
//! Tests for `spawn_stage_notification_subscriber`.
//!
//! Each test triggers real CRDT pipeline transitions so that `TransitionFired`
//! events propagate through the global broadcast channel to the subscriber.
//! Unique story IDs avoid interference between concurrently-running tests.
use super::mock_transport::MockTransport;
use super::stage_subscriber::spawn_stage_notification_subscriber;
use std::path::PathBuf;
fn tmp_root() -> PathBuf {
tempfile::tempdir().unwrap().keep()
}
fn setup_story(item_id: &str, stage_dir: &str, name: &str) {
crate::db::ensure_content_store();
crate::db::write_item_with_content(
item_id,
stage_dir,
&format!("---\nname: {name}\n---\n"),
crate::db::ItemMeta::named(name),
);
}
// ── Backlog → Coding ──────────────────────────────────────────────────────────
/// Transitioning a story from Backlog to Coding must produce a notification
/// that names the story and says "Current".
#[tokio::test]
async fn backlog_to_coding_sends_notification() {
crate::crdt_state::init_for_test();
setup_story("9951_story_b2c", "1_backlog", "Backlog to Coding");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(
transport,
|| vec!["!room-b2c:example.org".to_string()],
tmp_root(),
);
crate::agents::lifecycle::move_story_to_current("9951_story_b2c")
.expect("move to current must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hit = calls
.iter()
.any(|(_, plain, _)| plain.contains("9951") && plain.contains("Current"));
assert!(
hit,
"Expected a 'Current' notification for story 9951; got: {calls:?}"
);
}
// ── Coding → QA ──────────────────────────────────────────────────────────────
/// Transitioning a story from Coding to QA must produce a notification
/// that names the story and says "QA".
#[tokio::test]
async fn coding_to_qa_sends_notification() {
crate::crdt_state::init_for_test();
setup_story("9952_story_c2q", "2_current", "Coding to QA");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(
transport,
|| vec!["!room-c2q:example.org".to_string()],
tmp_root(),
);
crate::agents::lifecycle::move_story_to_qa("9952_story_c2q").expect("move to qa must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hit = calls
.iter()
.any(|(_, plain, _)| plain.contains("9952") && plain.contains("QA"));
assert!(
hit,
"Expected a 'QA' notification for story 9952; got: {calls:?}"
);
}
// ── Merge → Done ─────────────────────────────────────────────────────────────
/// Transitioning a story to Done must produce a notification with the party
/// emoji and say "Done".
#[tokio::test]
async fn merge_to_done_sends_party_notification() {
crate::crdt_state::init_for_test();
// Start in Merge stage so we can call move_story_to_done.
setup_story("9953_story_m2d", "4_merge", "Merge to Done");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(
transport,
|| vec!["!room-m2d:example.org".to_string()],
tmp_root(),
);
crate::agents::lifecycle::move_story_to_done("9953_story_m2d")
.expect("move to done must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hit = calls.iter().any(|(_, plain, _)| {
plain.contains("9953") && plain.contains("Done") && plain.contains('\u{1f389}')
});
assert!(
hit,
"Expected a party-emoji Done notification for story 9953; got: {calls:?}"
);
}
// ── Coding → QA → Merge coalescing ───────────────────────────────────────────
/// Rapid successive transitions for the same item are coalesced so only the
/// final stage is announced in a single notification.
#[tokio::test]
async fn rapid_transitions_are_coalesced() {
crate::crdt_state::init_for_test();
setup_story("9954_story_coalesce", "2_current", "Coalesce Test");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(
transport,
|| vec!["!room-coal:example.org".to_string()],
tmp_root(),
);
// Coding → QA → Merge in rapid succession (no sleep between).
crate::agents::lifecycle::move_story_to_qa("9954_story_coalesce")
.expect("move to qa must succeed");
crate::agents::lifecycle::move_story_to_merge("9954_story_coalesce")
.expect("move to merge must succeed");
// Wait for the debounce to flush.
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
// Filter to only this story's notifications.
let story_calls: Vec<_> = calls
.iter()
.filter(|(_, plain, _)| plain.contains("9954"))
.collect();
// Exactly one notification for this story (the coalesced final stage).
assert_eq!(
story_calls.len(),
1,
"Rapid transitions must be coalesced into one notification; got: {story_calls:?}"
);
// Final destination must be Merge.
assert!(
story_calls[0].1.contains("Merge"),
"Coalesced notification must mention the final stage (Merge); got: {}",
story_calls[0].1
);
}
// ── Dynamic room IDs ──────────────────────────────────────────────────────────
/// The subscriber calls the room-ID closure at notification time, so rooms
/// added after the subscriber is spawned are still reached.
#[tokio::test]
async fn dynamic_room_ids_are_resolved_at_notification_time() {
use std::sync::Arc;
crate::crdt_state::init_for_test();
setup_story("9955_story_dynroom", "1_backlog", "Dynamic Room");
let (transport, calls) = MockTransport::new();
let rooms: Arc<std::sync::Mutex<Vec<String>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let rooms_for_closure = Arc::clone(&rooms);
spawn_stage_notification_subscriber(
transport,
move || rooms_for_closure.lock().unwrap().clone(),
tmp_root(),
);
// Add a room AFTER spawning the subscriber (simulates WhatsApp first message).
rooms.lock().unwrap().push("phone:+15559990001".to_string());
crate::agents::lifecycle::move_story_to_current("9955_story_dynroom")
.expect("move to current must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hit = calls
.iter()
.any(|(room, plain, _)| room == "phone:+15559990001" && plain.contains("9955"));
assert!(
hit,
"Must deliver to the dynamically-added room; got: {calls:?}"
);
}
// ── No rooms → silent ────────────────────────────────────────────────────────
/// When no rooms are registered the subscriber must not panic and must send
/// nothing.
#[tokio::test]
async fn no_rooms_produces_no_notifications() {
crate::crdt_state::init_for_test();
setup_story("9956_story_noroom", "1_backlog", "No Room Test");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(transport, Vec::new, tmp_root());
crate::agents::lifecycle::move_story_to_current("9956_story_noroom")
.expect("move to current must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hits: Vec<_> = calls
.iter()
.filter(|(_, plain, _)| plain.contains("9956"))
.collect();
assert!(hits.is_empty(), "No rooms means no notifications");
}
// ── story_name included ───────────────────────────────────────────────────────
/// The notification must include the story name read from the CRDT.
#[tokio::test]
async fn notification_includes_story_name() {
crate::crdt_state::init_for_test();
setup_story("9957_story_named", "1_backlog", "Named Story Feature");
let (transport, calls) = MockTransport::new();
spawn_stage_notification_subscriber(
transport,
|| vec!["!room-name:example.org".to_string()],
tmp_root(),
);
crate::agents::lifecycle::move_story_to_current("9957_story_named")
.expect("move to current must succeed");
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
let hit = calls
.iter()
.any(|(_, plain, _)| plain.contains("9957") && plain.contains("Named Story Feature"));
assert!(
hit,
"Notification must include the story name; got: {calls:?}"
);
}
+1
View File
@@ -22,6 +22,7 @@ pub use format::{
format_blocked_notification, format_error_notification, format_stage_notification,
};
pub use io::spawn_notification_listener;
pub use io::spawn_stage_notification_subscriber;
// ── Error type ────────────────────────────────────────────────────────────────
+24
View File
@@ -226,6 +226,14 @@ pub(crate) fn spawn_notification_listeners(
watcher_rx_for_whatsapp,
root.clone(),
);
{
let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms);
service::notifications::spawn_stage_notification_subscriber(
Arc::clone(&ctx.transport),
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
root.clone(),
);
}
{
use crate::service::status::format::format_status_event;
@@ -267,6 +275,14 @@ pub(crate) fn spawn_notification_listeners(
watcher_rx_for_slack,
root.clone(),
);
{
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
service::notifications::spawn_stage_notification_subscriber(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
root.clone(),
);
}
{
use crate::service::status::format::format_status_event;
@@ -308,6 +324,14 @@ pub(crate) fn spawn_notification_listeners(
watcher_rx_for_discord,
root.clone(),
);
{
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
service::notifications::spawn_stage_notification_subscriber(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
root.clone(),
);
}
{
use crate::service::status::format::format_status_event;