storkit: merge 462_bug_stage_transition_notifications_can_arrive_out_of_order_and_show_wrong_story_name
This commit is contained in:
@@ -539,6 +539,7 @@ impl AgentPool {
|
|||||||
item_id: sid.clone(),
|
item_id: sid.clone(),
|
||||||
action: "reassign".to_string(),
|
action: "reassign".to_string(),
|
||||||
commit_msg: String::new(),
|
commit_msg: String::new(),
|
||||||
|
from_stage: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -136,6 +136,10 @@ pub fn format_blocked_notification(
|
|||||||
/// Minimum time between rate-limit notifications for the same agent.
|
/// Minimum time between rate-limit notifications for the same agent.
|
||||||
const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60);
|
const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
/// Window during which rapid stage transitions for the same item are coalesced
|
||||||
|
/// into a single notification (only the final stage is announced).
|
||||||
|
const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200);
|
||||||
|
|
||||||
/// Format a rate limit hard block notification message with scheduled resume time.
|
/// Format a rate limit hard block notification message with scheduled resume time.
|
||||||
///
|
///
|
||||||
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
|
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
|
||||||
@@ -202,36 +206,87 @@ pub fn spawn_notification_listener(
|
|||||||
// "story_id:agent_name" key, to debounce repeated warnings.
|
// "story_id:agent_name" key, to debounce repeated warnings.
|
||||||
let mut rate_limit_last_notified: HashMap<String, Instant> = HashMap::new();
|
let mut rate_limit_last_notified: HashMap<String, Instant> = HashMap::new();
|
||||||
|
|
||||||
|
// Pending stage-transition notifications, keyed by item_id.
|
||||||
|
// Value: (from_display, to_stage_key, story_name).
|
||||||
|
// Rapid successive transitions for the same item are coalesced: the
|
||||||
|
// original from_display is kept while to_stage_key is updated to the
|
||||||
|
// latest destination, so only one notification fires for the final stage.
|
||||||
|
let mut pending_transitions: HashMap<String, (String, String, Option<String>)> =
|
||||||
|
HashMap::new();
|
||||||
|
let mut flush_deadline: Option<tokio::time::Instant> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match rx.recv().await {
|
// Wait for the next event, or flush pending transitions when the
|
||||||
|
// debounce window expires.
|
||||||
|
let recv_result = if let Some(deadline) = flush_deadline {
|
||||||
|
tokio::time::timeout_at(deadline, rx.recv()).await.ok()
|
||||||
|
} else {
|
||||||
|
Some(rx.recv().await)
|
||||||
|
};
|
||||||
|
|
||||||
|
if recv_result.is_none() {
|
||||||
|
// Flush all coalesced stage-transition notifications.
|
||||||
|
for (item_id, (from_display, to_stage_key, story_name)) in
|
||||||
|
pending_transitions.drain()
|
||||||
|
{
|
||||||
|
let to_display = stage_display_name(&to_stage_key);
|
||||||
|
let (plain, html) = format_stage_notification(
|
||||||
|
&item_id,
|
||||||
|
story_name.as_deref(),
|
||||||
|
&from_display,
|
||||||
|
to_display,
|
||||||
|
);
|
||||||
|
slog!("[bot] Sending stage notification: {plain}");
|
||||||
|
for room_id in &get_room_ids() {
|
||||||
|
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
|
||||||
|
slog!("[bot] Failed to send notification to {room_id}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flush_deadline = None;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match recv_result.unwrap() {
|
||||||
Ok(WatcherEvent::WorkItem {
|
Ok(WatcherEvent::WorkItem {
|
||||||
ref stage,
|
ref stage,
|
||||||
ref item_id,
|
ref item_id,
|
||||||
|
ref from_stage,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
// Only notify on stage transitions, not creations.
|
// Determine from_display: prefer the actual from_stage recorded
|
||||||
let Some(from_display) = inferred_from_stage(stage) else {
|
// in the event (AC3); fall back to inference for synthetic events.
|
||||||
continue;
|
let from_display = from_stage
|
||||||
|
.as_deref()
|
||||||
|
.map(stage_display_name)
|
||||||
|
.or_else(|| inferred_from_stage(stage));
|
||||||
|
let Some(from_display) = from_display else {
|
||||||
|
continue; // creation or unknown transition — skip
|
||||||
};
|
};
|
||||||
let to_display = stage_display_name(stage);
|
|
||||||
|
|
||||||
let story_name = read_story_name(&project_root, stage, item_id);
|
// Look up the story name in the expected stage directory; fall
|
||||||
let (plain, html) = format_stage_notification(
|
// back to a full search so stale events still show the name (AC1).
|
||||||
item_id,
|
let story_name = read_story_name(&project_root, stage, item_id)
|
||||||
story_name.as_deref(),
|
.or_else(|| find_story_name_any_stage(&project_root, item_id));
|
||||||
from_display,
|
|
||||||
to_display,
|
|
||||||
);
|
|
||||||
|
|
||||||
slog!("[bot] Sending stage notification: {plain}");
|
// Buffer the transition. If this item_id is already pending (rapid
|
||||||
|
// succession), update to_stage_key to the latest destination while
|
||||||
for room_id in &get_room_ids() {
|
// preserving the original from_display (AC2).
|
||||||
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
|
pending_transitions
|
||||||
slog!(
|
.entry(item_id.clone())
|
||||||
"[bot] Failed to send notification to {room_id}: {e}"
|
.and_modify(|e| {
|
||||||
);
|
e.1 = stage.clone();
|
||||||
}
|
if story_name.is_some() {
|
||||||
|
e.2 = story_name.clone();
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.or_insert_with(|| {
|
||||||
|
(from_display.to_string(), stage.clone(), story_name)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Start or extend the debounce window.
|
||||||
|
flush_deadline =
|
||||||
|
Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE);
|
||||||
}
|
}
|
||||||
Ok(WatcherEvent::MergeFailure {
|
Ok(WatcherEvent::MergeFailure {
|
||||||
ref story_id,
|
ref story_id,
|
||||||
@@ -362,6 +417,28 @@ pub fn spawn_notification_listener(
|
|||||||
slog!(
|
slog!(
|
||||||
"[bot] Watcher channel closed, stopping notification listener"
|
"[bot] Watcher channel closed, stopping notification listener"
|
||||||
);
|
);
|
||||||
|
// Flush any coalesced transitions that haven't fired yet.
|
||||||
|
for (item_id, (from_display, to_stage_key, story_name)) in
|
||||||
|
pending_transitions.drain()
|
||||||
|
{
|
||||||
|
let to_display = stage_display_name(&to_stage_key);
|
||||||
|
let (plain, html) = format_stage_notification(
|
||||||
|
&item_id,
|
||||||
|
story_name.as_deref(),
|
||||||
|
&from_display,
|
||||||
|
to_display,
|
||||||
|
);
|
||||||
|
slog!("[bot] Sending stage notification: {plain}");
|
||||||
|
for room_id in &get_room_ids() {
|
||||||
|
if let Err(e) =
|
||||||
|
transport.send_message(room_id, &plain, &html).await
|
||||||
|
{
|
||||||
|
slog!(
|
||||||
|
"[bot] Failed to send notification to {room_id}: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -550,9 +627,12 @@ mod tests {
|
|||||||
item_id: "10_story_foo".to_string(),
|
item_id: "10_story_foo".to_string(),
|
||||||
action: "qa".to_string(),
|
action: "qa".to_string(),
|
||||||
commit_msg: "storkit: qa 10_story_foo".to_string(),
|
commit_msg: "storkit: qa 10_story_foo".to_string(),
|
||||||
|
from_stage: None,
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
// Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced
|
||||||
|
// notification flushes.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(350)).await;
|
||||||
|
|
||||||
let calls = calls.lock().unwrap();
|
let calls = calls.lock().unwrap();
|
||||||
assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room");
|
assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room");
|
||||||
@@ -582,6 +662,7 @@ mod tests {
|
|||||||
item_id: "10_story_foo".to_string(),
|
item_id: "10_story_foo".to_string(),
|
||||||
action: "qa".to_string(),
|
action: "qa".to_string(),
|
||||||
commit_msg: "storkit: qa 10_story_foo".to_string(),
|
commit_msg: "storkit: qa 10_story_foo".to_string(),
|
||||||
|
from_stage: None,
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|||||||
@@ -168,6 +168,7 @@ impl From<WatcherEvent> for Option<WsResponse> {
|
|||||||
item_id,
|
item_id,
|
||||||
action,
|
action,
|
||||||
commit_msg,
|
commit_msg,
|
||||||
|
..
|
||||||
} => Some(WsResponse::WorkItemChanged {
|
} => Some(WsResponse::WorkItemChanged {
|
||||||
stage,
|
stage,
|
||||||
item_id,
|
item_id,
|
||||||
@@ -901,6 +902,7 @@ mod tests {
|
|||||||
item_id: "42_story_foo".to_string(),
|
item_id: "42_story_foo".to_string(),
|
||||||
action: "start".to_string(),
|
action: "start".to_string(),
|
||||||
commit_msg: "storkit: start 42_story_foo".to_string(),
|
commit_msg: "storkit: start 42_story_foo".to_string(),
|
||||||
|
from_stage: None,
|
||||||
};
|
};
|
||||||
let ws_msg: Option<WsResponse> = evt.into();
|
let ws_msg: Option<WsResponse> = evt.into();
|
||||||
let ws_msg = ws_msg.expect("WorkItem should produce Some");
|
let ws_msg = ws_msg.expect("WorkItem should produce Some");
|
||||||
@@ -1380,6 +1382,7 @@ mod tests {
|
|||||||
item_id: "99_story_test".to_string(),
|
item_id: "99_story_test".to_string(),
|
||||||
action: "start".to_string(),
|
action: "start".to_string(),
|
||||||
commit_msg: "storkit: start 99_story_test".to_string(),
|
commit_msg: "storkit: start 99_story_test".to_string(),
|
||||||
|
from_stage: None,
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -44,6 +44,9 @@ pub enum WatcherEvent {
|
|||||||
action: String,
|
action: String,
|
||||||
/// The deterministic git commit message used (or that would have been used).
|
/// The deterministic git commit message used (or that would have been used).
|
||||||
commit_msg: String,
|
commit_msg: String,
|
||||||
|
/// The pipeline stage the item moved FROM, populated for move operations.
|
||||||
|
/// `None` for creations, deletions, or synthetic events.
|
||||||
|
from_stage: Option<String>,
|
||||||
},
|
},
|
||||||
/// `.storkit/project.toml` was modified at the project root (not inside a worktree).
|
/// `.storkit/project.toml` was modified at the project root (not inside a worktree).
|
||||||
ConfigChanged,
|
ConfigChanged,
|
||||||
@@ -276,12 +279,27 @@ fn flush_pending(
|
|||||||
slog!("[watcher] flush (broadcast-only): {commit_msg}");
|
slog!("[watcher] flush (broadcast-only): {commit_msg}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For move operations, find the source stage from deleted entries with matching item_id.
|
||||||
|
let from_stage: Option<String> = if !additions.is_empty() {
|
||||||
|
pending
|
||||||
|
.iter()
|
||||||
|
.filter(|(path, _)| !path.exists())
|
||||||
|
.find(|(path, _)| {
|
||||||
|
path.file_stem()
|
||||||
|
.and_then(|s| s.to_str()) == Some(item_id.as_str())
|
||||||
|
})
|
||||||
|
.map(|(_, stage)| stage.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
// Always broadcast the event so connected WebSocket clients stay in sync.
|
// Always broadcast the event so connected WebSocket clients stay in sync.
|
||||||
let evt = WatcherEvent::WorkItem {
|
let evt = WatcherEvent::WorkItem {
|
||||||
stage: dest_stage.to_string(),
|
stage: dest_stage.to_string(),
|
||||||
item_id,
|
item_id,
|
||||||
action: action.to_string(),
|
action: action.to_string(),
|
||||||
commit_msg,
|
commit_msg,
|
||||||
|
from_stage,
|
||||||
};
|
};
|
||||||
let _ = event_tx.send(evt);
|
let _ = event_tx.send(evt);
|
||||||
}
|
}
|
||||||
@@ -623,6 +641,7 @@ mod tests {
|
|||||||
item_id,
|
item_id,
|
||||||
action,
|
action,
|
||||||
commit_msg,
|
commit_msg,
|
||||||
|
..
|
||||||
} => {
|
} => {
|
||||||
assert_eq!(stage, "1_backlog");
|
assert_eq!(stage, "1_backlog");
|
||||||
assert_eq!(item_id, "42_story_foo");
|
assert_eq!(item_id, "42_story_foo");
|
||||||
@@ -667,6 +686,7 @@ mod tests {
|
|||||||
item_id,
|
item_id,
|
||||||
action,
|
action,
|
||||||
commit_msg,
|
commit_msg,
|
||||||
|
..
|
||||||
} => {
|
} => {
|
||||||
assert_eq!(stage, "2_current");
|
assert_eq!(stage, "2_current");
|
||||||
assert_eq!(item_id, "42_story_foo");
|
assert_eq!(item_id, "42_story_foo");
|
||||||
@@ -922,6 +942,75 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── flush_pending from_stage ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// AC3: when a pending map contains both a deletion (source stage) and a
|
||||||
|
/// creation (dest stage) for the same item_id, the broadcast event should
|
||||||
|
/// have `from_stage` set to the source stage key.
|
||||||
|
#[test]
|
||||||
|
fn flush_pending_sets_from_stage_for_move_operations() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
init_git_repo(tmp.path());
|
||||||
|
|
||||||
|
// Destination exists (file moved here).
|
||||||
|
let merge_dir = make_stage_dir(tmp.path(), "4_merge");
|
||||||
|
let merge_path = merge_dir.join("42_story_foo.md");
|
||||||
|
fs::write(&merge_path, "---\nname: test\n---\n").unwrap();
|
||||||
|
|
||||||
|
// Source path does NOT exist (file was moved away).
|
||||||
|
make_stage_dir(tmp.path(), "3_qa");
|
||||||
|
let qa_path = tmp
|
||||||
|
.path()
|
||||||
|
.join(".storkit")
|
||||||
|
.join("work")
|
||||||
|
.join("3_qa")
|
||||||
|
.join("42_story_foo.md");
|
||||||
|
|
||||||
|
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||||
|
let mut pending = HashMap::new();
|
||||||
|
pending.insert(merge_path, "4_merge".to_string()); // addition
|
||||||
|
pending.insert(qa_path, "3_qa".to_string()); // deletion
|
||||||
|
|
||||||
|
flush_pending(&pending, tmp.path(), &tx);
|
||||||
|
|
||||||
|
let evt = rx.try_recv().expect("expected event");
|
||||||
|
match evt {
|
||||||
|
WatcherEvent::WorkItem {
|
||||||
|
stage, from_stage, ..
|
||||||
|
} => {
|
||||||
|
assert_eq!(stage, "4_merge");
|
||||||
|
assert_eq!(from_stage, Some("3_qa".to_string()));
|
||||||
|
}
|
||||||
|
other => panic!("unexpected event: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC3: when a pending map has only an addition (creation, not a move),
|
||||||
|
/// `from_stage` should be `None`.
|
||||||
|
#[test]
|
||||||
|
fn flush_pending_sets_from_stage_to_none_for_creations() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
init_git_repo(tmp.path());
|
||||||
|
|
||||||
|
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||||
|
let story_path = stage_dir.join("55_story_new.md");
|
||||||
|
fs::write(&story_path, "---\nname: New Story\n---\n").unwrap();
|
||||||
|
|
||||||
|
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||||
|
let mut pending = HashMap::new();
|
||||||
|
pending.insert(story_path, "2_current".to_string());
|
||||||
|
|
||||||
|
flush_pending(&pending, tmp.path(), &tx);
|
||||||
|
|
||||||
|
let evt = rx.try_recv().expect("expected event");
|
||||||
|
match evt {
|
||||||
|
WatcherEvent::WorkItem { from_stage, .. } => {
|
||||||
|
assert_eq!(from_stage, None, "creation should have no from_stage");
|
||||||
|
}
|
||||||
|
other => panic!("unexpected event: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── stage_for_path (additional edge cases) ────────────────────────────────
|
// ── stage_for_path (additional edge cases) ────────────────────────────────
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user