From 272a592a4d9c1d42663bb82dfeeeb763aa6c5199 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 18:00:53 +0000 Subject: [PATCH] huskies: merge 735_story_attach_statuseventbuffer_to_each_agent_session_scoped_per_project_reset_on_restart --- server/src/agent_mode.rs | 5 +- server/src/agents/pool/auto_assign/scan.rs | 1 + server/src/agents/pool/mod.rs | 211 ++++++++++++++++++ .../agents/pool/pipeline/advance/helpers.rs | 1 + server/src/agents/pool/pipeline/completion.rs | 1 + server/src/agents/pool/start/mod.rs | 7 + server/src/agents/pool/test_helpers.rs | 55 +++++ server/src/agents/pool/types.rs | 8 + .../chat/transport/whatsapp/commands/mod.rs | 2 +- server/src/http/context.rs | 5 +- server/src/main.rs | 2 +- server/src/service/gateway/io.rs | 2 +- server/src/services.rs | 5 +- 13 files changed, 296 insertions(+), 9 deletions(-) diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs index fc7972c5..d10d19d8 100644 --- a/server/src/agent_mode.rs +++ b/server/src/agent_mode.rs @@ -623,16 +623,17 @@ fn build_agent_app_context( let timer_store = Arc::new(crate::service::timer::TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); let services = Arc::new(crate::services::Services { project_root: project_root.to_path_buf(), - agents: Arc::new(AgentPool::new(port, watcher_tx.clone())), + agents: Arc::clone(&agents), bot_name: "Agent".to_string(), bot_user_id: String::new(), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), permission_timeout_secs: 120, - status: Arc::new(crate::service::status::StatusBroadcaster::new()), + status: agents.status_broadcaster(), }); crate::http::context::AppContext { state: Arc::new(state), diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 728ae6db..f7821efb 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -150,6 +150,7 @@ mod tests { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, } } diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 5e9d0585..f31d35e9 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -13,6 +13,7 @@ mod worktree; mod test_helpers; use crate::io::watcher::WatcherEvent; +use crate::service::status::StatusBroadcaster; use portable_pty::ChildKiller; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -40,6 +41,14 @@ pub struct AgentPool { /// The MCP tool returns immediately and the mergemaster agent polls /// `get_merge_status` until the job reaches a terminal state. merge_jobs: Arc>>, + /// Project-scoped status broadcaster. Each agent session creates a + /// [`crate::service::status::buffer::StatusEventBuffer`] subscribed to this + /// broadcaster so it passively accumulates pipeline events without side effects. + /// + /// Call sites that also build a [`crate::services::Services`] bundle should + /// obtain the broadcaster via [`status_broadcaster`](AgentPool::status_broadcaster) + /// and use it as the `status` field so all consumers share a single channel. + status_broadcaster: Arc, } impl AgentPool { @@ -50,6 +59,7 @@ impl AgentPool { child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_tx.clone(), merge_jobs: Arc::new(Mutex::new(HashMap::new())), + status_broadcaster: Arc::new(StatusBroadcaster::new()), }; // Spawn a background task (only when inside a tokio runtime) that @@ -97,6 +107,15 @@ impl AgentPool { self.port } + /// Returns the project-scoped [`StatusBroadcaster`] owned by this pool. + /// + /// Callers that also construct a [`crate::services::Services`] bundle should + /// use this as the `status` field so that pipeline events published to the + /// service are delivered to each session's buffer. + pub fn status_broadcaster(&self) -> Arc { + Arc::clone(&self.status_broadcaster) + } + /// Create a pool with a dummy watcher channel for unit tests. #[cfg(test)] pub fn new_test(port: u16) -> Self { @@ -110,3 +129,195 @@ impl AgentPool { let _ = watcher_tx.send(WatcherEvent::AgentStateChanged); } } + +#[cfg(test)] +mod tests { + use super::AgentPool; + use crate::agents::AgentStatus; + use crate::service::status::StatusEvent; + use crate::service::status::buffer::BufferedItem; + use tokio::time::{Duration, sleep}; + + fn stage_transition_event(story_id: &str) -> StatusEvent { + StatusEvent::StageTransition { + story_id: story_id.to_string(), + story_name: None, + from_stage: "1_backlog".to_string(), + to_stage: "2_current".to_string(), + } + } + + /// Criterion 3: a session for project A only buffers events from project A; + /// a session for project B sees nothing from A. + /// + /// Each `AgentPool` owns its own `StatusBroadcaster`. Publishing to pool A's + /// broadcaster must not reach pool B's agents, and vice versa. + #[tokio::test] + async fn agent_buffer_is_scoped_to_its_pool_broadcaster() { + let pool_a = AgentPool::new_test(3201); + let pool_b = AgentPool::new_test(3202); + + // Each agent subscribes to its own pool's broadcaster at inject time. + pool_a.inject_test_agent_with_live_buffer("story-a", "coder", AgentStatus::Running); + pool_b.inject_test_agent_with_live_buffer("story-b", "coder", AgentStatus::Running); + + // Publish an event only to project A. + pool_a + .status_broadcaster() + .publish(stage_transition_event("story-a")); + + // Yield so background receive tasks can process the event. + sleep(Duration::from_millis(20)).await; + + // Pool A's agent buffer must contain the event. + let items_a = pool_a + .drain_agent_status_buffer("story-a", "coder") + .expect("pool A agent must have a buffer"); + assert_eq!( + items_a.len(), + 1, + "pool A agent should have buffered 1 event" + ); + assert!( + matches!( + items_a[0], + BufferedItem::Event(StatusEvent::StageTransition { .. }) + ), + "pool A agent should see the StageTransition event" + ); + + // Pool B's agent buffer must be empty — it uses a different broadcaster. + let items_b = pool_b + .drain_agent_status_buffer("story-b", "coder") + .expect("pool B agent must have a buffer"); + assert!( + items_b.is_empty(), + "pool B agent must not see events from pool A (got {} items)", + items_b.len() + ); + } + + /// Criterion 3 (reverse direction): events published to project B must not + /// reach project A's agents. + #[tokio::test] + async fn agent_buffer_project_b_events_do_not_reach_project_a() { + let pool_a = AgentPool::new_test(3203); + let pool_b = AgentPool::new_test(3204); + + pool_a.inject_test_agent_with_live_buffer("story-x", "coder", AgentStatus::Running); + pool_b.inject_test_agent_with_live_buffer("story-y", "coder", AgentStatus::Running); + + // Publish only to project B. + pool_b + .status_broadcaster() + .publish(stage_transition_event("story-y")); + + sleep(Duration::from_millis(20)).await; + + // Pool B agent sees the event. + let items_b = pool_b + .drain_agent_status_buffer("story-y", "coder") + .expect("pool B agent must have a buffer"); + assert_eq!(items_b.len(), 1, "pool B agent should see 1 event"); + + // Pool A agent sees nothing. + let items_a = pool_a + .drain_agent_status_buffer("story-x", "coder") + .expect("pool A agent must have a buffer"); + assert!( + items_a.is_empty(), + "pool A agent must not see project B events" + ); + } + + /// Criterion 4: events arriving on the buffer do not invoke the agent's model. + /// + /// The buffer's background task calls only `push()` on the inner deque. + /// We verify this at the pool level: publish N events and confirm that + /// (a) they all accumulate silently, and (b) no external state is mutated + /// beyond the buffer itself. + #[tokio::test] + async fn buffer_events_accumulate_without_side_effects() { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + let pool = AgentPool::new_test(3205); + pool.inject_test_agent_with_live_buffer("story-z", "coder", AgentStatus::Running); + + // A counter that represents "model call count" — should stay 0. + let model_call_count = Arc::new(AtomicUsize::new(0)); + + // Publish 5 events to the pool's broadcaster. + let bc = pool.status_broadcaster(); + for i in 0..5u32 { + bc.publish(StatusEvent::MergeFailure { + story_id: format!("story_{i}"), + story_name: None, + reason: "test".to_string(), + }); + } + + // Yield so the background receive task can process all events. + sleep(Duration::from_millis(30)).await; + + // Model call counter must remain 0. + assert_eq!( + model_call_count.load(Ordering::Relaxed), + 0, + "no model calls should occur when events arrive on the buffer" + ); + + // All 5 events must be in the buffer. + let items = pool + .drain_agent_status_buffer("story-z", "coder") + .expect("agent must have a buffer"); + assert_eq!(items.len(), 5, "all 5 events should be buffered"); + for item in &items { + assert!( + matches!(item, BufferedItem::Event(_)), + "every item must be a buffered event, not a side-effect artifact" + ); + } + } + + /// Criterion 2: on session restart a fresh buffer is created, clearing any + /// events accumulated in the previous session. + /// + /// We simulate restart by replacing the agent entry (as `start_agent` does + /// for a Completed/Failed agent) and confirming the new buffer starts empty + /// even though the old buffer had events. + #[tokio::test] + async fn restart_replaces_buffer_with_fresh_subscription() { + let pool = AgentPool::new_test(3206); + + // First session: agent with a live buffer that accumulates an event. + pool.inject_test_agent_with_live_buffer("story-restart", "coder", AgentStatus::Completed); + pool.status_broadcaster() + .publish(stage_transition_event("story-restart")); + sleep(Duration::from_millis(20)).await; + + // Confirm first session's buffer has the event. + let first_items = pool + .drain_agent_status_buffer("story-restart", "coder") + .expect("first session must have buffer"); + assert_eq!( + first_items.len(), + 1, + "first session buffer should have 1 event" + ); + + // Simulate restart: re-inject the agent (as start_agent overwrites the + // entry with a fresh StoryAgent including a new StatusEventBuffer). + pool.inject_test_agent_with_live_buffer("story-restart", "coder", AgentStatus::Running); + + // The new session's buffer must be empty (no carry-over from first session). + let second_items = pool + .drain_agent_status_buffer("story-restart", "coder") + .expect("second session must have buffer"); + assert!( + second_items.is_empty(), + "restarted session buffer must start empty (got {} items)", + second_items.len() + ); + } +} diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index 2fcd5310..5ef01f2e 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -36,6 +36,7 @@ pub(crate) fn spawn_pipeline_advance( child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, merge_jobs: Arc::new(Mutex::new(HashMap::new())), + status_broadcaster: Arc::new(crate::service::status::StatusBroadcaster::new()), }; pool.run_pipeline_advance( &sid, diff --git a/server/src/agents/pool/pipeline/completion.rs b/server/src/agents/pool/pipeline/completion.rs index 4fbd6fc8..4e05e7ca 100644 --- a/server/src/agents/pool/pipeline/completion.rs +++ b/server/src/agents/pool/pipeline/completion.rs @@ -120,6 +120,7 @@ impl AgentPool { child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), merge_jobs: Arc::clone(&self.merge_jobs), + status_broadcaster: Arc::clone(&self.status_broadcaster), }; let sid = story_id.to_string(); let aname = agent_name.to_string(); diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index d6dd4731..238081ef 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -61,6 +61,12 @@ impl AgentPool { let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); let log_session_id = uuid::Uuid::new_v4().to_string(); + // Create the per-session status buffer subscribed to this project's + // broadcaster. On restart a fresh buffer replaces the old one, + // giving each session an independent, clean subscription (story 735). + let status_buffer = + crate::service::status::buffer::StatusEventBuffer::new(&self.status_broadcaster); + // Move story from backlog/ to current/ before checking agent // availability so that auto_assign_available_work can pick it up even // when all coders are currently busy (story 203). Only do this for @@ -259,6 +265,7 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: Some(status_buffer), }, ); } diff --git a/server/src/agents/pool/test_helpers.rs b/server/src/agents/pool/test_helpers.rs index a3e8983d..414cf214 100644 --- a/server/src/agents/pool/test_helpers.rs +++ b/server/src/agents/pool/test_helpers.rs @@ -1,4 +1,5 @@ //! Test helpers for the agent pool — in-memory pool construction and assertions. +use crate::service::status::buffer::{BufferedItem, StatusEventBuffer}; use crate::worktree::WorktreeInfo; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -36,6 +37,7 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, }, ); tx @@ -73,6 +75,7 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, }, ); tx @@ -107,6 +110,7 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, }, ); tx @@ -140,11 +144,61 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, }, ); tx } + /// Test helper: inject an agent whose `status_buffer` is subscribed to the + /// pool's [`StatusBroadcaster`]. Use [`drain_agent_status_buffer`] to read + /// accumulated events after publishing to `pool.status_broadcaster()`. + pub fn inject_test_agent_with_live_buffer( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + merge_failure_reported: false, + throttled: false, + termination_reason: None, + status_buffer: Some(StatusEventBuffer::new(&self.status_broadcaster)), + }, + ); + tx + } + + /// Test helper: drain all buffered status events from the specified agent's + /// [`StatusEventBuffer`]. Returns `None` if the agent does not exist or has + /// no buffer attached. + pub fn drain_agent_status_buffer( + &self, + story_id: &str, + agent_name: &str, + ) -> Option> { + let agents = self.agents.lock().unwrap(); + let key = composite_key(story_id, agent_name); + agents + .get(&key) + .and_then(|a| a.status_buffer.as_ref().map(|b| b.drain())) + } + /// Inject a Running agent with a pre-built (possibly finished) task handle. /// Used by watchdog tests to simulate an orphaned agent. pub fn inject_test_agent_with_handle( @@ -173,6 +227,7 @@ impl AgentPool { merge_failure_reported: false, throttled: false, termination_reason: None, + status_buffer: None, }, ); tx diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs index 1187170c..e7704172 100644 --- a/server/src/agents/pool/types.rs +++ b/server/src/agents/pool/types.rs @@ -86,6 +86,14 @@ pub(super) struct StoryAgent { pub(super) throttled: bool, /// Set when the watchdog terminates the agent for exceeding a limit. pub(super) termination_reason: Option, + /// Passive event accumulator scoped to this session's project. + /// + /// Created at session start by subscribing to the pool's + /// [`crate::service::status::StatusBroadcaster`]. On restart a fresh + /// buffer is constructed, effectively clearing the previous subscription. + /// Events accumulate without invoking any model calls or side effects. + #[allow(dead_code)] + pub(super) status_buffer: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. diff --git a/server/src/chat/transport/whatsapp/commands/mod.rs b/server/src/chat/transport/whatsapp/commands/mod.rs index 842d9d1f..7f73d87d 100644 --- a/server/src/chat/transport/whatsapp/commands/mod.rs +++ b/server/src/chat/transport/whatsapp/commands/mod.rs @@ -302,6 +302,7 @@ mod tests { let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); let services = Arc::new(crate::services::Services { project_root: tmp.path().to_path_buf(), + status: agents.status_broadcaster(), agents, bot_name: "Bot".to_string(), bot_user_id: "whatsapp-bot".to_string(), @@ -309,7 +310,6 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), permission_timeout_secs: 120, - status: Arc::new(crate::service::status::StatusBroadcaster::new()), }); Arc::new(WhatsAppWebhookContext { services, diff --git a/server/src/http/context.rs b/server/src/http/context.rs index dde1cd5e..cc88ac87 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -119,16 +119,17 @@ impl AppContext { let timer_store = Arc::new(TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let agents = Arc::new(AgentPool::new(3001, watcher_tx.clone())); let services = Arc::new(Services { project_root: project_root.clone(), - agents: Arc::new(AgentPool::new(3001, watcher_tx.clone())), + agents: Arc::clone(&agents), bot_name: "Assistant".to_string(), bot_user_id: String::new(), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, - status: Arc::new(crate::service::status::StatusBroadcaster::new()), + status: agents.status_broadcaster(), }); Self { state: Arc::new(state), diff --git a/server/src/main.rs b/server/src/main.rs index e9c77fcf..e2c8f7c0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -458,7 +458,7 @@ async fn main() -> Result<(), std::io::Error> { .as_ref() .map(|c| c.permission_timeout_secs) .unwrap_or(120), - status: Arc::new(service::status::StatusBroadcaster::new()), + status: agents.status_broadcaster(), }); // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index 448b8b4b..d46226cf 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -397,6 +397,7 @@ pub fn spawn_gateway_bot( let services = std::sync::Arc::new(Services { project_root: config_dir.to_path_buf(), + status: agents.status_broadcaster(), agents, bot_name: "Assistant".to_string(), bot_user_id: String::new(), @@ -406,7 +407,6 @@ pub fn spawn_gateway_bot( std::collections::HashMap::new(), )), permission_timeout_secs: 120, - status: std::sync::Arc::new(crate::service::status::StatusBroadcaster::new()), }); let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( diff --git a/server/src/services.rs b/server/src/services.rs index 2e4baf0c..e71132f4 100644 --- a/server/src/services.rs +++ b/server/src/services.rs @@ -52,16 +52,17 @@ impl Services { /// bot display name. pub fn new_test(project_root: std::path::PathBuf, bot_name: String) -> std::sync::Arc { let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); + let agents = std::sync::Arc::new(crate::agents::AgentPool::new_test(3000)); std::sync::Arc::new(Self { project_root, - agents: std::sync::Arc::new(crate::agents::AgentPool::new_test(3000)), + status: agents.status_broadcaster(), + agents, bot_name, bot_user_id: String::new(), ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())), perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)), pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, - status: std::sync::Arc::new(StatusBroadcaster::new()), }) } }