huskies: merge 735_story_attach_statuseventbuffer_to_each_agent_session_scoped_per_project_reset_on_restart

This commit is contained in:
dave
2026-04-27 18:00:53 +00:00
parent d654f55981
commit 272a592a4d
13 changed files with 296 additions and 9 deletions
+3 -2
View File
@@ -623,16 +623,17 @@ fn build_agent_app_context(
let timer_store = Arc::new(crate::service::timer::TimerStore::load( let timer_store = Arc::new(crate::service::timer::TimerStore::load(
project_root.join(".huskies").join("timers.json"), project_root.join(".huskies").join("timers.json"),
)); ));
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
let services = Arc::new(crate::services::Services { let services = Arc::new(crate::services::Services {
project_root: project_root.to_path_buf(), project_root: project_root.to_path_buf(),
agents: Arc::new(AgentPool::new(port, watcher_tx.clone())), agents: Arc::clone(&agents),
bot_name: "Agent".to_string(), bot_name: "Agent".to_string(),
bot_user_id: String::new(), bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: 120, permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()), status: agents.status_broadcaster(),
}); });
crate::http::context::AppContext { crate::http::context::AppContext {
state: Arc::new(state), state: Arc::new(state),
@@ -150,6 +150,7 @@ mod tests {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
} }
} }
+211
View File
@@ -13,6 +13,7 @@ mod worktree;
mod test_helpers; mod test_helpers;
use crate::io::watcher::WatcherEvent; use crate::io::watcher::WatcherEvent;
use crate::service::status::StatusBroadcaster;
use portable_pty::ChildKiller; use portable_pty::ChildKiller;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -40,6 +41,14 @@ pub struct AgentPool {
/// The MCP tool returns immediately and the mergemaster agent polls /// The MCP tool returns immediately and the mergemaster agent polls
/// `get_merge_status` until the job reaches a terminal state. /// `get_merge_status` until the job reaches a terminal state.
merge_jobs: Arc<Mutex<HashMap<String, super::merge::MergeJob>>>, merge_jobs: Arc<Mutex<HashMap<String, super::merge::MergeJob>>>,
/// Project-scoped status broadcaster. Each agent session creates a
/// [`crate::service::status::buffer::StatusEventBuffer`] subscribed to this
/// broadcaster so it passively accumulates pipeline events without side effects.
///
/// Call sites that also build a [`crate::services::Services`] bundle should
/// obtain the broadcaster via [`status_broadcaster`](AgentPool::status_broadcaster)
/// and use it as the `status` field so all consumers share a single channel.
status_broadcaster: Arc<StatusBroadcaster>,
} }
impl AgentPool { impl AgentPool {
@@ -50,6 +59,7 @@ impl AgentPool {
child_killers: Arc::new(Mutex::new(HashMap::new())), child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx: watcher_tx.clone(), watcher_tx: watcher_tx.clone(),
merge_jobs: Arc::new(Mutex::new(HashMap::new())), merge_jobs: Arc::new(Mutex::new(HashMap::new())),
status_broadcaster: Arc::new(StatusBroadcaster::new()),
}; };
// Spawn a background task (only when inside a tokio runtime) that // Spawn a background task (only when inside a tokio runtime) that
@@ -97,6 +107,15 @@ impl AgentPool {
self.port self.port
} }
/// Returns the project-scoped [`StatusBroadcaster`] owned by this pool.
///
/// Callers that also construct a [`crate::services::Services`] bundle should
/// use this as the `status` field so that pipeline events published to the
/// service are delivered to each session's buffer.
pub fn status_broadcaster(&self) -> Arc<StatusBroadcaster> {
Arc::clone(&self.status_broadcaster)
}
/// Create a pool with a dummy watcher channel for unit tests. /// Create a pool with a dummy watcher channel for unit tests.
#[cfg(test)] #[cfg(test)]
pub fn new_test(port: u16) -> Self { pub fn new_test(port: u16) -> Self {
@@ -110,3 +129,195 @@ impl AgentPool {
let _ = watcher_tx.send(WatcherEvent::AgentStateChanged); let _ = watcher_tx.send(WatcherEvent::AgentStateChanged);
} }
} }
#[cfg(test)]
mod tests {
use super::AgentPool;
use crate::agents::AgentStatus;
use crate::service::status::StatusEvent;
use crate::service::status::buffer::BufferedItem;
use tokio::time::{Duration, sleep};
fn stage_transition_event(story_id: &str) -> StatusEvent {
StatusEvent::StageTransition {
story_id: story_id.to_string(),
story_name: None,
from_stage: "1_backlog".to_string(),
to_stage: "2_current".to_string(),
}
}
/// Criterion 3: a session for project A only buffers events from project A;
/// a session for project B sees nothing from A.
///
/// Each `AgentPool` owns its own `StatusBroadcaster`. Publishing to pool A's
/// broadcaster must not reach pool B's agents, and vice versa.
#[tokio::test]
async fn agent_buffer_is_scoped_to_its_pool_broadcaster() {
let pool_a = AgentPool::new_test(3201);
let pool_b = AgentPool::new_test(3202);
// Each agent subscribes to its own pool's broadcaster at inject time.
pool_a.inject_test_agent_with_live_buffer("story-a", "coder", AgentStatus::Running);
pool_b.inject_test_agent_with_live_buffer("story-b", "coder", AgentStatus::Running);
// Publish an event only to project A.
pool_a
.status_broadcaster()
.publish(stage_transition_event("story-a"));
// Yield so background receive tasks can process the event.
sleep(Duration::from_millis(20)).await;
// Pool A's agent buffer must contain the event.
let items_a = pool_a
.drain_agent_status_buffer("story-a", "coder")
.expect("pool A agent must have a buffer");
assert_eq!(
items_a.len(),
1,
"pool A agent should have buffered 1 event"
);
assert!(
matches!(
items_a[0],
BufferedItem::Event(StatusEvent::StageTransition { .. })
),
"pool A agent should see the StageTransition event"
);
// Pool B's agent buffer must be empty — it uses a different broadcaster.
let items_b = pool_b
.drain_agent_status_buffer("story-b", "coder")
.expect("pool B agent must have a buffer");
assert!(
items_b.is_empty(),
"pool B agent must not see events from pool A (got {} items)",
items_b.len()
);
}
/// Criterion 3 (reverse direction): events published to project B must not
/// reach project A's agents.
#[tokio::test]
async fn agent_buffer_project_b_events_do_not_reach_project_a() {
let pool_a = AgentPool::new_test(3203);
let pool_b = AgentPool::new_test(3204);
pool_a.inject_test_agent_with_live_buffer("story-x", "coder", AgentStatus::Running);
pool_b.inject_test_agent_with_live_buffer("story-y", "coder", AgentStatus::Running);
// Publish only to project B.
pool_b
.status_broadcaster()
.publish(stage_transition_event("story-y"));
sleep(Duration::from_millis(20)).await;
// Pool B agent sees the event.
let items_b = pool_b
.drain_agent_status_buffer("story-y", "coder")
.expect("pool B agent must have a buffer");
assert_eq!(items_b.len(), 1, "pool B agent should see 1 event");
// Pool A agent sees nothing.
let items_a = pool_a
.drain_agent_status_buffer("story-x", "coder")
.expect("pool A agent must have a buffer");
assert!(
items_a.is_empty(),
"pool A agent must not see project B events"
);
}
/// Criterion 4: events arriving on the buffer do not invoke the agent's model.
///
/// The buffer's background task calls only `push()` on the inner deque.
/// We verify this at the pool level: publish N events and confirm that
/// (a) they all accumulate silently, and (b) no external state is mutated
/// beyond the buffer itself.
#[tokio::test]
async fn buffer_events_accumulate_without_side_effects() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let pool = AgentPool::new_test(3205);
pool.inject_test_agent_with_live_buffer("story-z", "coder", AgentStatus::Running);
// A counter that represents "model call count" — should stay 0.
let model_call_count = Arc::new(AtomicUsize::new(0));
// Publish 5 events to the pool's broadcaster.
let bc = pool.status_broadcaster();
for i in 0..5u32 {
bc.publish(StatusEvent::MergeFailure {
story_id: format!("story_{i}"),
story_name: None,
reason: "test".to_string(),
});
}
// Yield so the background receive task can process all events.
sleep(Duration::from_millis(30)).await;
// Model call counter must remain 0.
assert_eq!(
model_call_count.load(Ordering::Relaxed),
0,
"no model calls should occur when events arrive on the buffer"
);
// All 5 events must be in the buffer.
let items = pool
.drain_agent_status_buffer("story-z", "coder")
.expect("agent must have a buffer");
assert_eq!(items.len(), 5, "all 5 events should be buffered");
for item in &items {
assert!(
matches!(item, BufferedItem::Event(_)),
"every item must be a buffered event, not a side-effect artifact"
);
}
}
/// Criterion 2: on session restart a fresh buffer is created, clearing any
/// events accumulated in the previous session.
///
/// We simulate restart by replacing the agent entry (as `start_agent` does
/// for a Completed/Failed agent) and confirming the new buffer starts empty
/// even though the old buffer had events.
#[tokio::test]
async fn restart_replaces_buffer_with_fresh_subscription() {
let pool = AgentPool::new_test(3206);
// First session: agent with a live buffer that accumulates an event.
pool.inject_test_agent_with_live_buffer("story-restart", "coder", AgentStatus::Completed);
pool.status_broadcaster()
.publish(stage_transition_event("story-restart"));
sleep(Duration::from_millis(20)).await;
// Confirm first session's buffer has the event.
let first_items = pool
.drain_agent_status_buffer("story-restart", "coder")
.expect("first session must have buffer");
assert_eq!(
first_items.len(),
1,
"first session buffer should have 1 event"
);
// Simulate restart: re-inject the agent (as start_agent overwrites the
// entry with a fresh StoryAgent including a new StatusEventBuffer).
pool.inject_test_agent_with_live_buffer("story-restart", "coder", AgentStatus::Running);
// The new session's buffer must be empty (no carry-over from first session).
let second_items = pool
.drain_agent_status_buffer("story-restart", "coder")
.expect("second session must have buffer");
assert!(
second_items.is_empty(),
"restarted session buffer must start empty (got {} items)",
second_items.len()
);
}
}
@@ -36,6 +36,7 @@ pub(crate) fn spawn_pipeline_advance(
child_killers: Arc::new(Mutex::new(HashMap::new())), child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx, watcher_tx,
merge_jobs: Arc::new(Mutex::new(HashMap::new())), merge_jobs: Arc::new(Mutex::new(HashMap::new())),
status_broadcaster: Arc::new(crate::service::status::StatusBroadcaster::new()),
}; };
pool.run_pipeline_advance( pool.run_pipeline_advance(
&sid, &sid,
@@ -120,6 +120,7 @@ impl AgentPool {
child_killers: Arc::clone(&self.child_killers), child_killers: Arc::clone(&self.child_killers),
watcher_tx: self.watcher_tx.clone(), watcher_tx: self.watcher_tx.clone(),
merge_jobs: Arc::clone(&self.merge_jobs), merge_jobs: Arc::clone(&self.merge_jobs),
status_broadcaster: Arc::clone(&self.status_broadcaster),
}; };
let sid = story_id.to_string(); let sid = story_id.to_string();
let aname = agent_name.to_string(); let aname = agent_name.to_string();
+7
View File
@@ -61,6 +61,12 @@ impl AgentPool {
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new())); let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
let log_session_id = uuid::Uuid::new_v4().to_string(); let log_session_id = uuid::Uuid::new_v4().to_string();
// Create the per-session status buffer subscribed to this project's
// broadcaster. On restart a fresh buffer replaces the old one,
// giving each session an independent, clean subscription (story 735).
let status_buffer =
crate::service::status::buffer::StatusEventBuffer::new(&self.status_broadcaster);
// Move story from backlog/ to current/ before checking agent // Move story from backlog/ to current/ before checking agent
// availability so that auto_assign_available_work can pick it up even // availability so that auto_assign_available_work can pick it up even
// when all coders are currently busy (story 203). Only do this for // when all coders are currently busy (story 203). Only do this for
@@ -259,6 +265,7 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: Some(status_buffer),
}, },
); );
} }
+55
View File
@@ -1,4 +1,5 @@
//! Test helpers for the agent pool — in-memory pool construction and assertions. //! Test helpers for the agent pool — in-memory pool construction and assertions.
use crate::service::status::buffer::{BufferedItem, StatusEventBuffer};
use crate::worktree::WorktreeInfo; use crate::worktree::WorktreeInfo;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -36,6 +37,7 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
}, },
); );
tx tx
@@ -73,6 +75,7 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
}, },
); );
tx tx
@@ -107,6 +110,7 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
}, },
); );
tx tx
@@ -140,11 +144,61 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
}, },
); );
tx tx
} }
/// Test helper: inject an agent whose `status_buffer` is subscribed to the
/// pool's [`StatusBroadcaster`]. Use [`drain_agent_status_buffer`] to read
/// accumulated events after publishing to `pool.status_broadcaster()`.
pub fn inject_test_agent_with_live_buffer(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
merge_failure_reported: false,
throttled: false,
termination_reason: None,
status_buffer: Some(StatusEventBuffer::new(&self.status_broadcaster)),
},
);
tx
}
/// Test helper: drain all buffered status events from the specified agent's
/// [`StatusEventBuffer`]. Returns `None` if the agent does not exist or has
/// no buffer attached.
pub fn drain_agent_status_buffer(
&self,
story_id: &str,
agent_name: &str,
) -> Option<Vec<BufferedItem>> {
let agents = self.agents.lock().unwrap();
let key = composite_key(story_id, agent_name);
agents
.get(&key)
.and_then(|a| a.status_buffer.as_ref().map(|b| b.drain()))
}
/// Inject a Running agent with a pre-built (possibly finished) task handle. /// Inject a Running agent with a pre-built (possibly finished) task handle.
/// Used by watchdog tests to simulate an orphaned agent. /// Used by watchdog tests to simulate an orphaned agent.
pub fn inject_test_agent_with_handle( pub fn inject_test_agent_with_handle(
@@ -173,6 +227,7 @@ impl AgentPool {
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: false,
termination_reason: None, termination_reason: None,
status_buffer: None,
}, },
); );
tx tx
+8
View File
@@ -86,6 +86,14 @@ pub(super) struct StoryAgent {
pub(super) throttled: bool, pub(super) throttled: bool,
/// Set when the watchdog terminates the agent for exceeding a limit. /// Set when the watchdog terminates the agent for exceeding a limit.
pub(super) termination_reason: Option<crate::agents::TerminationReason>, pub(super) termination_reason: Option<crate::agents::TerminationReason>,
/// Passive event accumulator scoped to this session's project.
///
/// Created at session start by subscribing to the pool's
/// [`crate::service::status::StatusBroadcaster`]. On restart a fresh
/// buffer is constructed, effectively clearing the previous subscription.
/// Events accumulate without invoking any model calls or side effects.
#[allow(dead_code)]
pub(super) status_buffer: Option<crate::service::status::buffer::StatusEventBuffer>,
} }
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
@@ -302,6 +302,7 @@ mod tests {
let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
let services = Arc::new(crate::services::Services { let services = Arc::new(crate::services::Services {
project_root: tmp.path().to_path_buf(), project_root: tmp.path().to_path_buf(),
status: agents.status_broadcaster(),
agents, agents,
bot_name: "Bot".to_string(), bot_name: "Bot".to_string(),
bot_user_id: "whatsapp-bot".to_string(), bot_user_id: "whatsapp-bot".to_string(),
@@ -309,7 +310,6 @@ mod tests {
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())),
permission_timeout_secs: 120, permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
}); });
Arc::new(WhatsAppWebhookContext { Arc::new(WhatsAppWebhookContext {
services, services,
+3 -2
View File
@@ -119,16 +119,17 @@ impl AppContext {
let timer_store = Arc::new(TimerStore::load( let timer_store = Arc::new(TimerStore::load(
project_root.join(".huskies").join("timers.json"), project_root.join(".huskies").join("timers.json"),
)); ));
let agents = Arc::new(AgentPool::new(3001, watcher_tx.clone()));
let services = Arc::new(Services { let services = Arc::new(Services {
project_root: project_root.clone(), project_root: project_root.clone(),
agents: Arc::new(AgentPool::new(3001, watcher_tx.clone())), agents: Arc::clone(&agents),
bot_name: "Assistant".to_string(), bot_name: "Assistant".to_string(),
bot_user_id: String::new(), bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120, permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()), status: agents.status_broadcaster(),
}); });
Self { Self {
state: Arc::new(state), state: Arc::new(state),
+1 -1
View File
@@ -458,7 +458,7 @@ async fn main() -> Result<(), std::io::Error> {
.as_ref() .as_ref()
.map(|c| c.permission_timeout_secs) .map(|c| c.permission_timeout_secs)
.unwrap_or(120), .unwrap_or(120),
status: Arc::new(service::status::StatusBroadcaster::new()), status: agents.status_broadcaster(),
}); });
// Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp".
+1 -1
View File
@@ -397,6 +397,7 @@ pub fn spawn_gateway_bot(
let services = std::sync::Arc::new(Services { let services = std::sync::Arc::new(Services {
project_root: config_dir.to_path_buf(), project_root: config_dir.to_path_buf(),
status: agents.status_broadcaster(),
agents, agents,
bot_name: "Assistant".to_string(), bot_name: "Assistant".to_string(),
bot_user_id: String::new(), bot_user_id: String::new(),
@@ -406,7 +407,6 @@ pub fn spawn_gateway_bot(
std::collections::HashMap::new(), std::collections::HashMap::new(),
)), )),
permission_timeout_secs: 120, permission_timeout_secs: 120,
status: std::sync::Arc::new(crate::service::status::StatusBroadcaster::new()),
}); });
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
+3 -2
View File
@@ -52,16 +52,17 @@ impl Services {
/// bot display name. /// bot display name.
pub fn new_test(project_root: std::path::PathBuf, bot_name: String) -> std::sync::Arc<Self> { pub fn new_test(project_root: std::path::PathBuf, bot_name: String) -> std::sync::Arc<Self> {
let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); let (_perm_tx, perm_rx) = mpsc::unbounded_channel();
let agents = std::sync::Arc::new(crate::agents::AgentPool::new_test(3000));
std::sync::Arc::new(Self { std::sync::Arc::new(Self {
project_root, project_root,
agents: std::sync::Arc::new(crate::agents::AgentPool::new_test(3000)), status: agents.status_broadcaster(),
agents,
bot_name, bot_name,
bot_user_id: String::new(), bot_user_id: String::new(),
ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())), ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())),
perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)), perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)),
pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())), pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())),
permission_timeout_secs: 120, permission_timeout_secs: 120,
status: std::sync::Arc::new(StatusBroadcaster::new()),
}) })
} }
} }