Files
huskies/server/src/chat/transport/matrix/notifications.rs
T

1013 lines
39 KiB
Rust
Raw Normal View History

//! Stage transition notifications for Matrix rooms.
//!
//! Subscribes to [`WatcherEvent`] broadcasts and posts a notification to all
//! configured Matrix rooms whenever a work item moves between pipeline stages.
use crate::io::story_metadata::parse_front_matter;
use crate::io::watcher::WatcherEvent;
use crate::slog;
use crate::chat::ChatTransport;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
/// Human-readable display name for a pipeline stage directory.
pub fn stage_display_name(stage: &str) -> &'static str {
match stage {
"1_backlog" => "Backlog",
"2_current" => "Current",
"3_qa" => "QA",
"4_merge" => "Merge",
"5_done" => "Done",
"6_archived" => "Archived",
_ => "Unknown",
}
}
/// Infer the previous pipeline stage for a given destination stage.
///
/// Returns `None` for `1_backlog` since items are created there (not
/// transitioned from another stage).
pub fn inferred_from_stage(to_stage: &str) -> Option<&'static str> {
match to_stage {
"2_current" => Some("Backlog"),
"3_qa" => Some("Current"),
"4_merge" => Some("QA"),
"5_done" => Some("Merge"),
"6_archived" => Some("Done"),
_ => None,
}
}
/// Extract the numeric story number from an item ID like `"261_story_slug"`.
pub fn extract_story_number(item_id: &str) -> Option<&str> {
item_id
.split('_')
.next()
.filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
}
/// Read the story name from the work item file's YAML front matter.
///
/// Returns `None` if the file doesn't exist or has no parseable name.
pub fn read_story_name(project_root: &Path, stage: &str, item_id: &str) -> Option<String> {
let path = project_root
.join(".storkit")
.join("work")
.join(stage)
.join(format!("{item_id}.md"));
let contents = std::fs::read_to_string(&path).ok()?;
let meta = parse_front_matter(&contents).ok()?;
meta.name
}
/// Format a stage transition notification message.
///
/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`.
pub fn format_stage_notification(
item_id: &str,
story_name: Option<&str>,
from_stage: &str,
to_stage: &str,
) -> (String, String) {
let number = extract_story_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let prefix = if to_stage == "Done" { "\u{1f389} " } else { "" };
let plain = format!("{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}");
let html = format!(
"{prefix}<strong>#{number}</strong> <em>{name}</em> \u{2014} {from_stage} \u{2192} {to_stage}"
);
(plain, html)
}
/// Format an error notification message for a story failure.
///
/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`.
pub fn format_error_notification(
item_id: &str,
story_name: Option<&str>,
reason: &str,
) -> (String, String) {
let number = extract_story_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let plain = format!("\u{274c} #{number} {name} \u{2014} {reason}");
let html = format!(
"\u{274c} <strong>#{number}</strong> <em>{name}</em> \u{2014} {reason}"
);
(plain, html)
}
/// Search all pipeline stages for a story name.
///
/// Tries each known pipeline stage directory in order and returns the first
/// name found. Used for events (like rate-limit warnings) that arrive without
/// a known stage.
fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option<String> {
for stage in &["2_current", "3_qa", "4_merge", "1_backlog", "5_done"] {
if let Some(name) = read_story_name(project_root, stage, item_id) {
return Some(name);
}
}
None
}
/// Format a blocked-story notification message.
///
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_blocked_notification(
item_id: &str,
story_name: Option<&str>,
reason: &str,
) -> (String, String) {
let number = extract_story_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let plain = format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}");
let html = format!(
"\u{1f6ab} <strong>#{number}</strong> <em>{name}</em> \u{2014} BLOCKED: {reason}"
);
(plain, html)
}
/// Minimum time between rate-limit notifications for the same agent.
const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60);
/// Window during which rapid stage transitions for the same item are coalesced
/// into a single notification (only the final stage is announced).
const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200);
/// Format a rate limit hard block notification message with scheduled resume time.
///
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_rate_limit_hard_block_notification(
item_id: &str,
story_name: Option<&str>,
agent_name: &str,
resume_at: chrono::DateTime<chrono::Utc>,
) -> (String, String) {
let number = extract_story_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let local_time = resume_at.with_timezone(&chrono::Local);
let resume_str = local_time.format("%Y-%m-%d %H:%M").to_string();
let plain = format!(
"\u{1f6d1} #{number} {name} \u{2014} {agent_name} hit a hard rate limit; \
will auto-resume at {resume_str}"
);
let html = format!(
"\u{1f6d1} <strong>#{number}</strong> <em>{name}</em> \u{2014} \
{agent_name} hit a hard rate limit; will auto-resume at {resume_str}"
);
(plain, html)
}
/// Format a rate limit warning notification message.
///
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_rate_limit_notification(
item_id: &str,
story_name: Option<&str>,
agent_name: &str,
) -> (String, String) {
let number = extract_story_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let plain = format!(
"\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit"
);
let html = format!(
"\u{26a0}\u{fe0f} <strong>#{number}</strong> <em>{name}</em> \u{2014} \
{agent_name} hit an API rate limit"
);
(plain, html)
}
/// Spawn a background task that listens for watcher events and posts
/// stage-transition notifications to all configured rooms via the
/// [`ChatTransport`] abstraction.
///
/// `get_room_ids` is called on each notification to obtain the current list of
/// destination room IDs. Pass a closure that returns a static list for Matrix
/// and Slack, or one that reads from a runtime `Arc<Mutex<HashSet<String>>>`
/// for WhatsApp ambient senders.
pub fn spawn_notification_listener(
transport: Arc<dyn ChatTransport>,
get_room_ids: impl Fn() -> Vec<String> + Send + 'static,
watcher_rx: broadcast::Receiver<WatcherEvent>,
project_root: PathBuf,
) {
tokio::spawn(async move {
let mut rx = watcher_rx;
// Tracks when a rate-limit notification was last sent for each
// "story_id:agent_name" key, to debounce repeated warnings.
let mut rate_limit_last_notified: HashMap<String, Instant> = HashMap::new();
// Pending stage-transition notifications, keyed by item_id.
// Value: (from_display, to_stage_key, story_name).
// Rapid successive transitions for the same item are coalesced: the
// original from_display is kept while to_stage_key is updated to the
// latest destination, so only one notification fires for the final stage.
let mut pending_transitions: HashMap<String, (String, String, Option<String>)> =
HashMap::new();
let mut flush_deadline: Option<tokio::time::Instant> = None;
loop {
// Wait for the next event, or flush pending transitions when the
// debounce window expires.
let recv_result = if let Some(deadline) = flush_deadline {
tokio::time::timeout_at(deadline, rx.recv()).await.ok()
} else {
Some(rx.recv().await)
};
if recv_result.is_none() {
// Flush all coalesced stage-transition notifications.
for (item_id, (from_display, to_stage_key, story_name)) in
pending_transitions.drain()
{
let to_display = stage_display_name(&to_stage_key);
let (plain, html) = format_stage_notification(
&item_id,
story_name.as_deref(),
&from_display,
to_display,
);
slog!("[bot] Sending stage notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!("[bot] Failed to send notification to {room_id}: {e}");
}
}
}
flush_deadline = None;
continue;
}
match recv_result.unwrap() {
Ok(WatcherEvent::WorkItem {
ref stage,
ref item_id,
ref from_stage,
..
}) => {
// Determine from_display: prefer the actual from_stage recorded
// in the event (AC3); fall back to inference for synthetic events.
let from_display = from_stage
.as_deref()
.map(stage_display_name)
.or_else(|| inferred_from_stage(stage));
let Some(from_display) = from_display else {
continue; // creation or unknown transition — skip
};
// Look up the story name in the expected stage directory; fall
// back to a full search so stale events still show the name (AC1).
let story_name = read_story_name(&project_root, stage, item_id)
.or_else(|| find_story_name_any_stage(&project_root, item_id));
// Buffer the transition. If this item_id is already pending (rapid
// succession), update to_stage_key to the latest destination while
// preserving the original from_display (AC2).
pending_transitions
.entry(item_id.clone())
.and_modify(|e| {
e.1 = stage.clone();
if story_name.is_some() {
e.2 = story_name.clone();
}
})
.or_insert_with(|| {
(from_display.to_string(), stage.clone(), story_name)
});
// Start or extend the debounce window.
flush_deadline =
Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE);
}
Ok(WatcherEvent::MergeFailure {
ref story_id,
ref reason,
}) => {
let story_name =
read_story_name(&project_root, "4_merge", story_id);
let (plain, html) = format_error_notification(
story_id,
story_name.as_deref(),
reason,
);
slog!("[bot] Sending error notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send error notification to {room_id}: {e}"
);
}
}
}
Ok(WatcherEvent::RateLimitWarning {
ref story_id,
ref agent_name,
}) => {
// Debounce: skip if we sent a notification for this agent
// within the last RATE_LIMIT_DEBOUNCE seconds.
let debounce_key = format!("{story_id}:{agent_name}");
let now = Instant::now();
if let Some(&last) = rate_limit_last_notified.get(&debounce_key)
&& now.duration_since(last) < RATE_LIMIT_DEBOUNCE
{
slog!(
"[bot] Rate-limit notification debounced for \
{story_id}:{agent_name}"
);
continue;
}
rate_limit_last_notified.insert(debounce_key, now);
let story_name = find_story_name_any_stage(&project_root, story_id);
let (plain, html) = format_rate_limit_notification(
story_id,
story_name.as_deref(),
agent_name,
);
slog!("[bot] Sending rate-limit notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send rate-limit notification \
to {room_id}: {e}"
);
}
}
}
Ok(WatcherEvent::StoryBlocked {
ref story_id,
ref reason,
}) => {
let story_name = find_story_name_any_stage(&project_root, story_id);
let (plain, html) = format_blocked_notification(
story_id,
story_name.as_deref(),
reason,
);
slog!("[bot] Sending blocked notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send blocked notification to {room_id}: {e}"
);
}
}
}
Ok(WatcherEvent::RateLimitHardBlock {
ref story_id,
ref agent_name,
reset_at,
}) => {
// Debounce: reuse the same key as RateLimitWarning so both
// types are rate-limited together for the same agent.
let debounce_key = format!("{story_id}:{agent_name}");
let now = Instant::now();
if let Some(&last) = rate_limit_last_notified.get(&debounce_key)
&& now.duration_since(last) < RATE_LIMIT_DEBOUNCE
{
slog!(
"[bot] Rate-limit hard-block notification debounced for \
{story_id}:{agent_name}"
);
continue;
}
rate_limit_last_notified.insert(debounce_key, now);
let story_name = find_story_name_any_stage(&project_root, story_id);
let (plain, html) = format_rate_limit_hard_block_notification(
story_id,
story_name.as_deref(),
agent_name,
reset_at,
);
slog!("[bot] Sending rate-limit hard-block notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send rate-limit hard-block notification \
to {room_id}: {e}"
);
}
}
}
Ok(_) => {} // Ignore non-work-item events
Err(broadcast::error::RecvError::Lagged(n)) => {
slog!(
"[bot] Notification listener lagged, skipped {n} events"
);
}
Err(broadcast::error::RecvError::Closed) => {
slog!(
"[bot] Watcher channel closed, stopping notification listener"
);
// Flush any coalesced transitions that haven't fired yet.
for (item_id, (from_display, to_stage_key, story_name)) in
pending_transitions.drain()
{
let to_display = stage_display_name(&to_stage_key);
let (plain, html) = format_stage_notification(
&item_id,
story_name.as_deref(),
&from_display,
to_display,
);
slog!("[bot] Sending stage notification: {plain}");
for room_id in &get_room_ids() {
if let Err(e) =
transport.send_message(room_id, &plain, &html).await
{
slog!(
"[bot] Failed to send notification to {room_id}: {e}"
);
}
}
}
break;
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use crate::chat::MessageId;
// ── MockTransport ───────────────────────────────────────────────────────
type CallLog = Arc<std::sync::Mutex<Vec<(String, String, String)>>>;
/// Records every `send_message` call for inspection in tests.
struct MockTransport {
calls: CallLog,
}
impl MockTransport {
fn new() -> (Arc<Self>, CallLog) {
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
(Arc::new(Self { calls: Arc::clone(&calls) }), calls)
}
}
#[async_trait]
impl crate::chat::ChatTransport for MockTransport {
async fn send_message(&self, room_id: &str, plain: &str, html: &str) -> Result<MessageId, String> {
self.calls.lock().unwrap().push((room_id.to_string(), plain.to_string(), html.to_string()));
Ok("mock-msg-id".to_string())
}
async fn edit_message(&self, _room_id: &str, _id: &str, _plain: &str, _html: &str) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
// ── spawn_notification_listener: RateLimitWarning ───────────────────────
/// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called
/// with a notification that names the agent and story.
#[tokio::test]
async fn rate_limit_warning_sends_notification_with_agent_and_story() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".storkit").join("work").join("2_current");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("365_story_rate_limit.md"),
"---\nname: Rate Limit Test Story\n---\n",
)
.unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room123:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::RateLimitWarning {
story_id: "365_story_rate_limit".to_string(),
agent_name: "coder-1".to_string(),
}).unwrap();
// Give the spawned task time to process the event.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Expected exactly one notification");
let (room_id, plain, _html) = &calls[0];
assert_eq!(room_id, "!room123:example.org");
assert!(plain.contains("365"), "plain should contain story number");
assert!(plain.contains("Rate Limit Test Story"), "plain should contain story name");
assert!(plain.contains("coder-1"), "plain should contain agent name");
assert!(plain.contains("rate limit"), "plain should mention rate limit");
}
/// AC4: a second RateLimitWarning for the same agent within the debounce
/// window must NOT trigger a second notification.
#[tokio::test]
async fn rate_limit_warning_is_debounced() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
// Send the same warning twice in rapid succession.
for _ in 0..2 {
watcher_tx.send(WatcherEvent::RateLimitWarning {
story_id: "42_story_debounce".to_string(),
agent_name: "coder-2".to_string(),
}).unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Debounce should suppress the second notification");
}
/// AC4 (corollary): warnings for different agents are NOT debounced against
/// each other — both should produce notifications.
#[tokio::test]
async fn rate_limit_warnings_for_different_agents_both_notify() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::RateLimitWarning {
story_id: "42_story_foo".to_string(),
agent_name: "coder-1".to_string(),
}).unwrap();
watcher_tx.send(WatcherEvent::RateLimitWarning {
story_id: "42_story_foo".to_string(),
agent_name: "coder-2".to_string(),
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 2, "Different agents should each trigger a notification");
}
// ── dynamic room IDs (WhatsApp ambient_rooms pattern) ───────────────────
/// Notifications are sent to the rooms returned by the closure at
/// notification time, not at listener-spawn time. This verifies that a
/// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers
/// messages to the rooms present when the event fires.
#[tokio::test]
async fn stage_notification_uses_dynamic_room_ids() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".storkit").join("work").join("3_qa");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("10_story_foo.md"),
"---\nname: Foo Story\n---\n",
)
.unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
let rooms: Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()));
let rooms_for_closure = Arc::clone(&rooms);
spawn_notification_listener(
transport,
move || rooms_for_closure.lock().unwrap().iter().cloned().collect(),
watcher_rx,
tmp.path().to_path_buf(),
);
// Add a room after the listener is spawned (simulates a user messaging first).
rooms.lock().unwrap().insert("phone:+15551234567".to_string());
watcher_tx.send(WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "storkit: qa 10_story_foo".to_string(),
from_stage: None,
}).unwrap();
// Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced
// notification flushes.
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room");
assert_eq!(calls[0].0, "phone:+15551234567");
assert!(calls[0].1.contains("10"), "plain should contain story number");
assert!(calls[0].1.contains("Foo Story"), "plain should contain story name");
}
/// When no rooms are registered (e.g. no WhatsApp users have messaged yet),
/// no notifications are sent and the listener does not panic.
#[tokio::test]
async fn stage_notification_with_no_rooms_is_silent() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
Vec::new,
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "storkit: qa 10_story_foo".to_string(),
from_stage: None,
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 0, "No rooms means no notifications");
}
// ── stage_display_name ──────────────────────────────────────────────────
#[test]
fn stage_display_name_maps_all_known_stages() {
assert_eq!(stage_display_name("1_backlog"), "Backlog");
assert_eq!(stage_display_name("2_current"), "Current");
assert_eq!(stage_display_name("3_qa"), "QA");
assert_eq!(stage_display_name("4_merge"), "Merge");
assert_eq!(stage_display_name("5_done"), "Done");
assert_eq!(stage_display_name("6_archived"), "Archived");
assert_eq!(stage_display_name("unknown"), "Unknown");
}
// ── inferred_from_stage ─────────────────────────────────────────────────
#[test]
fn inferred_from_stage_returns_previous_stage() {
assert_eq!(inferred_from_stage("2_current"), Some("Backlog"));
assert_eq!(inferred_from_stage("3_qa"), Some("Current"));
assert_eq!(inferred_from_stage("4_merge"), Some("QA"));
assert_eq!(inferred_from_stage("5_done"), Some("Merge"));
assert_eq!(inferred_from_stage("6_archived"), Some("Done"));
}
#[test]
fn inferred_from_stage_returns_none_for_backlog() {
assert_eq!(inferred_from_stage("1_backlog"), None);
}
#[test]
fn inferred_from_stage_returns_none_for_unknown() {
assert_eq!(inferred_from_stage("9_unknown"), None);
}
// ── extract_story_number ────────────────────────────────────────────────
#[test]
fn extract_story_number_parses_numeric_prefix() {
assert_eq!(
extract_story_number("261_story_bot_notifications"),
Some("261")
);
assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42"));
assert_eq!(extract_story_number("1_spike_research"), Some("1"));
}
#[test]
fn extract_story_number_returns_none_for_non_numeric() {
assert_eq!(extract_story_number("abc_story_thing"), None);
assert_eq!(extract_story_number(""), None);
}
// ── read_story_name ─────────────────────────────────────────────────────
#[test]
fn read_story_name_reads_from_front_matter() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp
.path()
.join(".storkit")
.join("work")
.join("2_current");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("42_story_my_feature.md"),
"---\nname: My Cool Feature\n---\n# Story\n",
)
.unwrap();
let name = read_story_name(tmp.path(), "2_current", "42_story_my_feature");
assert_eq!(name.as_deref(), Some("My Cool Feature"));
}
#[test]
fn read_story_name_returns_none_for_missing_file() {
let tmp = tempfile::tempdir().unwrap();
let name = read_story_name(tmp.path(), "2_current", "99_story_missing");
assert_eq!(name, None);
}
#[test]
fn read_story_name_returns_none_for_missing_name_field() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp
.path()
.join(".storkit")
.join("work")
.join("2_current");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("42_story_no_name.md"),
"---\ncoverage_baseline: 50%\n---\n# Story\n",
)
.unwrap();
let name = read_story_name(tmp.path(), "2_current", "42_story_no_name");
assert_eq!(name, None);
}
// ── format_error_notification ────────────────────────────────────────────
#[test]
fn format_error_notification_with_story_name() {
let (plain, html) =
format_error_notification("262_story_bot_errors", Some("Bot error notifications"), "merge conflict in src/main.rs");
assert_eq!(
plain,
"\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs"
);
assert_eq!(
html,
"\u{274c} <strong>#262</strong> <em>Bot error notifications</em> \u{2014} merge conflict in src/main.rs"
);
}
#[test]
fn format_error_notification_without_story_name_falls_back_to_item_id() {
let (plain, _html) =
format_error_notification("42_bug_fix_thing", None, "tests failed");
assert_eq!(
plain,
"\u{274c} #42 42_bug_fix_thing \u{2014} tests failed"
);
}
#[test]
fn format_error_notification_non_numeric_id_uses_full_id() {
let (plain, _html) =
format_error_notification("abc_story_thing", Some("Some Story"), "clippy errors");
assert_eq!(
plain,
"\u{274c} #abc_story_thing Some Story \u{2014} clippy errors"
);
}
// ── format_blocked_notification ─────────────────────────────────────────
#[test]
fn format_blocked_notification_with_story_name() {
let (plain, html) = format_blocked_notification(
"425_story_blocking_reason",
Some("Blocking Reason Story"),
"Retry limit exceeded (3/3) at coder stage",
);
assert_eq!(
plain,
"\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage"
);
assert_eq!(
html,
"\u{1f6ab} <strong>#425</strong> <em>Blocking Reason Story</em> \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage"
);
}
#[test]
fn format_blocked_notification_falls_back_to_item_id() {
let (plain, _html) =
format_blocked_notification("42_story_thing", None, "empty diff");
assert_eq!(
plain,
"\u{1f6ab} #42 42_story_thing \u{2014} BLOCKED: empty diff"
);
}
// ── spawn_notification_listener: StoryBlocked ───────────────────────────
/// AC1: when a StoryBlocked event arrives, send_message is called with a
/// notification that includes the story number, name, and reason.
#[tokio::test]
async fn story_blocked_sends_notification_with_reason() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".storkit").join("work").join("2_current");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("425_story_blocking_test.md"),
"---\nname: Blocking Test Story\n---\n",
)
.unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room123:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: "425_story_blocking_test".to_string(),
reason: "Retry limit exceeded (3/3) at coder stage".to_string(),
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Expected exactly one notification");
let (room_id, plain, html) = &calls[0];
assert_eq!(room_id, "!room123:example.org");
assert!(plain.contains("425"), "plain should contain story number");
assert!(plain.contains("Blocking Test Story"), "plain should contain story name");
assert!(plain.contains("BLOCKED"), "plain should contain BLOCKED label");
assert!(plain.contains("Retry limit exceeded"), "plain should contain the reason");
assert!(html.contains("BLOCKED"), "html should contain BLOCKED label");
}
/// StoryBlocked with no room registered should not panic.
#[tokio::test]
async fn story_blocked_with_no_rooms_is_silent() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
Vec::new,
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: "42_story_no_rooms".to_string(),
reason: "empty diff".to_string(),
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 0, "No rooms means no notifications");
}
// ── format_rate_limit_notification ─────────────────────────────────────
#[test]
fn format_rate_limit_notification_includes_agent_and_story() {
let (plain, html) = format_rate_limit_notification(
"365_story_my_feature",
Some("My Feature"),
"coder-2",
);
assert_eq!(
plain,
"\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit"
);
assert_eq!(
html,
"\u{26a0}\u{fe0f} <strong>#365</strong> <em>My Feature</em> \u{2014} coder-2 hit an API rate limit"
);
}
#[test]
fn format_rate_limit_notification_falls_back_to_item_id() {
let (plain, _html) =
format_rate_limit_notification("42_story_thing", None, "coder-1");
assert_eq!(
plain,
"\u{26a0}\u{fe0f} #42 42_story_thing \u{2014} coder-1 hit an API rate limit"
);
}
// ── format_stage_notification ───────────────────────────────────────────
#[test]
fn format_notification_done_stage_includes_party_emoji() {
let (plain, html) = format_stage_notification(
"353_story_done",
Some("Done Story"),
"Merge",
"Done",
);
assert_eq!(
plain,
"\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done"
);
assert_eq!(
html,
"\u{1f389} <strong>#353</strong> <em>Done Story</em> \u{2014} Merge \u{2192} Done"
);
}
#[test]
fn format_notification_non_done_stage_has_no_emoji() {
let (plain, _html) = format_stage_notification(
"42_story_thing",
Some("Some Story"),
"Backlog",
"Current",
);
assert!(!plain.contains("\u{1f389}"));
}
#[test]
fn format_notification_with_story_name() {
let (plain, html) = format_stage_notification(
"261_story_bot_notifications",
Some("Bot notifications"),
"Upcoming",
"Current",
);
assert_eq!(
plain,
"#261 Bot notifications \u{2014} Upcoming \u{2192} Current"
);
assert_eq!(
html,
"<strong>#261</strong> <em>Bot notifications</em> \u{2014} Upcoming \u{2192} Current"
);
}
#[test]
fn format_notification_without_story_name_falls_back_to_item_id() {
let (plain, _html) = format_stage_notification(
"42_bug_fix_thing",
None,
"Current",
"QA",
);
assert_eq!(
plain,
"#42 42_bug_fix_thing \u{2014} Current \u{2192} QA"
);
}
#[test]
fn format_notification_non_numeric_id_uses_full_id() {
let (plain, _html) = format_stage_notification(
"abc_story_thing",
Some("Some Story"),
"QA",
"Merge",
);
assert_eq!(
plain,
"#abc_story_thing Some Story \u{2014} QA \u{2192} Merge"
);
}
}