diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index f31d35e9..f7adcbf6 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -280,6 +280,74 @@ mod tests { } } + /// Story 736 — Criterion 3 end-to-end: fire N status events at an idle + /// agent session; verify the agent is not invoked; then drain and format the + /// buffer to simulate what `start_agent` does on the next user message; + /// assert all N events appear in the `` block and the buffer + /// is empty afterwards. + #[tokio::test] + async fn drain_and_format_prepends_all_buffered_events_and_empties_buffer() { + use crate::service::status::buffer::BufferedItem; + use crate::service::status::buffer::format_buffered_items; + + let pool = AgentPool::new_test(3207); + // Idle agent with a live buffer — simulates a previous session that is + // now completed but has been accumulating pipeline events. + pool.inject_test_agent_with_live_buffer("story-736", "coder", AgentStatus::Completed); + + // Fire N status events at the pool's broadcaster while the agent is idle. + // The agent must NOT be invoked — events only accumulate in the buffer. + let bc = pool.status_broadcaster(); + let n = 4_usize; + for i in 0..n { + bc.publish(StatusEvent::MergeFailure { + story_id: format!("other-{i}"), + story_name: None, + reason: format!("reason-{i}"), + }); + } + // Yield so the background receive tasks can process all events. + sleep(Duration::from_millis(30)).await; + + // Drain the buffer — this is what start_agent does before spawning the + // next agent turn (story 736, criterion 1). + let items = pool + .drain_agent_status_buffer("story-736", "coder") + .expect("agent must have a buffer"); + assert_eq!(items.len(), n, "all {n} events must be buffered"); + assert!( + items.iter().all(|i| matches!(i, BufferedItem::Event(_))), + "all items must be events (no truncation)" + ); + + // Format into a block (criterion 1). + let block = format_buffered_items(&items).expect("non-empty slice must yield a block"); + assert!( + block.starts_with("\n"), + "block must open with : {block}" + ); + assert!( + block.ends_with("\n"), + "block must close with : {block}" + ); + for i in 0..n { + assert!( + block.contains(&format!("reason-{i}")), + "block must include content from event {i}: {block}" + ); + } + + // After drain the buffer must be empty (criterion 2). + let after = pool + .drain_agent_status_buffer("story-736", "coder") + .expect("agent must still have a buffer"); + assert!( + after.is_empty(), + "buffer must be empty after drain (got {} items)", + after.len() + ); + } + /// Criterion 2: on session restart a fresh buffer is created, clearing any /// events accumulated in the previous session. /// diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index 314f59cf..ee53abc2 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -100,6 +100,11 @@ impl AgentPool { // (bug 118). let resolved_name: String; let key: String; + // Buffered status events accumulated while the agent was idle. Drained + // inside the lock (before the new entry replaces the old one) and + // formatted as a `` block for prepending to the first + // agent turn (story 736). + let prior_events: Option; { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; @@ -249,6 +254,17 @@ impl AgentPool { story '{story_id}' will be picked up when the agent becomes available" )); } + // Drain accumulated status events from the previous session before + // replacing the entry with the new one. The drained items are + // formatted and prepended to the first agent turn (story 736). + prior_events = { + let items = agents + .get(&key) + .and_then(|a| a.status_buffer.as_ref().map(|b| b.drain())) + .unwrap_or_default(); + crate::service::status::buffer::format_buffered_items(&items) + }; + agents.insert( key.clone(), StoryAgent { @@ -336,6 +352,7 @@ impl AgentPool { self.watcher_tx.clone(), Arc::clone(&self.merge_jobs), inactivity_timeout_secs, + prior_events, )); // Store the task handle while the agent is still Pending. diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 8eefeec5..ae998db6 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -48,6 +48,11 @@ pub(super) async fn run_agent_spawn( watcher_tx: broadcast::Sender, merge_jobs: Arc>>, inactivity_timeout_secs: u64, + // Formatted `` block drained from the previous session's + // buffer. Prepended to the first agent turn so the agent sees what + // happened while it was idle (story 736). `None` when there were no + // buffered events. + buffered_events_block: Option, ) { // Re-bind to the legacy `_clone` / `_owned` names so the body below remains // a verbatim copy of the original closure (story 157). @@ -169,6 +174,13 @@ pub(super) async fn run_agent_spawn( } }; + // Prepend buffered pipeline events from the previous idle period so the + // agent sees what happened while it was not running (story 736). + let effective_prompt = match buffered_events_block { + Some(block) if !block.is_empty() => format!("{block}\n\n{effective_prompt}"), + _ => effective_prompt, + }; + // Step 3: transition to Running now that the worktree is ready. { if let Ok(mut agents) = agents_ref.lock() diff --git a/server/src/service/status/buffer.rs b/server/src/service/status/buffer.rs index 971d271a..b1de6829 100644 --- a/server/src/service/status/buffer.rs +++ b/server/src/service/status/buffer.rs @@ -1,7 +1,5 @@ //! [`StatusEventBuffer`] — bounded, per-instance accumulator over a //! [`StatusBroadcaster`]. -// Infrastructure module: public items are not yet wired to production call -// sites but are exercised by the unit tests below. #![allow(dead_code)] //! //! A `StatusEventBuffer` subscribes to a [`StatusBroadcaster`] on construction @@ -162,6 +160,40 @@ impl Drop for StatusEventBuffer { } } +// ── Formatting ──────────────────────────────────────────────────────────────── + +/// Format a [`BufferedItem`] slice into a `` block +/// ready to prepend to an agent message. +/// +/// Each [`BufferedItem::Event`] is rendered with +/// [`format_status_event`](crate::service::status::format::format_status_event). +/// A [`BufferedItem::Truncated`] entry produces a note about how many older +/// events were omitted. +/// +/// Returns `None` when `items` is empty so callers can skip the prepend +/// entirely. +pub fn format_buffered_items(items: &[BufferedItem]) -> Option { + if items.is_empty() { + return None; + } + use crate::service::status::format::format_status_event; + let mut lines = Vec::with_capacity(items.len()); + for item in items { + match item { + BufferedItem::Truncated(n) => { + lines.push(format!("[{n} older event(s) were omitted due to overflow]")); + } + BufferedItem::Event(event) => { + lines.push(format_status_event(event)); + } + } + } + Some(format!( + "\n{}\n", + lines.join("\n") + )) +} + // ── Tests ───────────────────────────────────────────────────────────────────── #[cfg(test)] @@ -353,4 +385,55 @@ mod tests { "clear should discard events and truncation marker" ); } + + // ── format_buffered_items ───────────────────────────────────────────────── + + #[test] + fn format_empty_slice_returns_none() { + assert!(format_buffered_items(&[]).is_none()); + } + + #[test] + fn format_events_produces_recent_events_block() { + let items = vec![ + BufferedItem::Event(StatusEvent::MergeFailure { + story_id: "42_story_foo".to_string(), + story_name: Some("Foo".to_string()), + reason: "conflict".to_string(), + }), + BufferedItem::Event(StatusEvent::StoryBlocked { + story_id: "7_story_bar".to_string(), + story_name: None, + reason: "retry limit".to_string(), + }), + ]; + let block = format_buffered_items(&items).expect("non-empty slice must produce a block"); + assert!( + block.starts_with("\n"), + "must open with : {block}" + ); + assert!( + block.ends_with("\n"), + "must close with : {block}" + ); + assert!(block.contains("conflict"), "must include event content"); + assert!(block.contains("retry limit"), "must include event content"); + } + + #[test] + fn format_truncation_marker_is_included() { + let items = vec![ + BufferedItem::Truncated(3), + BufferedItem::Event(StatusEvent::MergeFailure { + story_id: "1_story_x".to_string(), + story_name: None, + reason: "test".to_string(), + }), + ]; + let block = format_buffered_items(&items).unwrap(); + assert!( + block.contains("3 older event(s) were omitted"), + "truncation note must appear in block: {block}" + ); + } }