diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index f9d1c0d1..0ef3f714 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -1,24 +1,27 @@ //! Matrix bot context — shared state for the Matrix bot (rooms, history, permissions). -use crate::agents::AgentPool; use crate::chat::ChatTransport; -use crate::http::context::{PermissionDecision, PermissionForward}; use crate::service::timer::TimerStore; +use crate::services::Services; use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::path::PathBuf; +use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; -use tokio::sync::{RwLock, mpsc, oneshot}; +use tokio::sync::RwLock; use super::history::ConversationHistory; /// Shared context injected into Matrix event handlers. #[derive(Clone)] pub struct BotContext { - pub bot_user_id: OwnedUserId, + /// Shared services bundle (project root, agent pool, bot identity, permissions). + pub services: Arc, + /// Matrix-specific parsed user ID (e.g. `@timmy:homeserver.local`). + /// Transport-specific — kept separate from `services.bot_user_id` (String) + /// because Matrix SDK APIs require `OwnedUserId` for comparisons and + /// `.localpart()` extraction. + pub matrix_user_id: OwnedUserId, /// All room IDs the bot listens in. pub target_room_ids: Vec, - pub project_root: PathBuf, pub allowed_users: Vec, /// Shared, per-room rolling conversation history. pub history: ConversationHistory, @@ -28,27 +31,6 @@ pub struct BotContext { /// bot so it can continue a conversation thread without requiring an /// explicit `@mention` on every follow-up. pub bot_sent_event_ids: Arc>>, - /// Receiver for permission requests from the MCP `prompt_permission` tool. - /// During an active chat the bot locks this to poll for incoming requests. - pub perm_rx: Arc>>, - /// Per-room pending permission reply senders. When a permission prompt is - /// posted to a room the oneshot sender is stored here; when the user - /// replies (yes/no) the event handler resolves it. - pub pending_perm_replies: - Arc>>>, - /// How long to wait for a user to respond to a permission prompt before - /// denying (fail-closed). - pub permission_timeout_secs: u64, - /// The name the bot uses to refer to itself. Derived from `display_name` - /// in bot.toml; defaults to "Assistant" when unset. - pub bot_name: String, - /// Set of room IDs where ambient mode is active. In ambient mode the bot - /// responds to all messages rather than only addressed ones. - /// Uses a sync mutex since locks are never held across await points. - /// Room IDs are stored as plain strings (platform-agnostic). - pub ambient_rooms: Arc>>, - /// Agent pool for checking agent availability. - pub agents: Arc, /// Per-room htop monitoring sessions. Keyed by room ID; each entry holds /// a stop-signal sender that the background task watches. pub htop_sessions: super::super::htop::HtopSessions, @@ -78,12 +60,12 @@ impl BotContext { /// Each project lives in a subdirectory named after the project, so the /// effective root for commands is `project_root / active_project_name`. /// In standalone (single-project) mode this returns `project_root` unchanged. - pub async fn effective_project_root(&self) -> PathBuf { + pub async fn effective_project_root(&self) -> std::path::PathBuf { if let Some(ref ap) = self.gateway_active_project { let name = ap.read().await.clone(); - self.project_root.join(&name) + self.services.project_root.join(&name) } else { - self.project_root.clone() + self.services.project_root.clone() } } @@ -138,6 +120,7 @@ impl BotContext { #[cfg(test)] mod tests { use super::*; + use std::collections::HashMap; use std::path::PathBuf; use tokio::sync::mpsc; @@ -145,6 +128,52 @@ mod tests { s.parse().unwrap() } + /// Build a test `Services` bundle with the given project root. + fn test_services(project_root: PathBuf) -> Arc { + let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); + Arc::new(Services { + project_root, + agents: Arc::new(crate::agents::AgentPool::new_test(3000)), + bot_name: "Assistant".to_string(), + bot_user_id: "@bot:example.com".to_string(), + ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), + perm_rx: Arc::new(TokioMutex::new(perm_rx)), + pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), + permission_timeout_secs: 120, + }) + } + + /// Build a minimal `BotContext` for testing with the given Services and + /// optional gateway active project. + fn test_bot_context( + services: Arc, + gateway_active_project: Option>>, + gateway_projects: Vec, + gateway_project_urls: BTreeMap, + ) -> BotContext { + BotContext { + services, + matrix_user_id: make_user_id("@bot:example.com"), + target_room_ids: vec![], + allowed_users: vec![], + history: Arc::new(TokioMutex::new(HashMap::new())), + history_size: 20, + bot_sent_event_ids: Arc::new(TokioMutex::new(std::collections::HashSet::new())), + htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), + transport: Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new( + "test-phone".to_string(), + "test-token".to_string(), + "pipeline_notification".to_string(), + )), + timer_store: Arc::new(crate::service::timer::TimerStore::load( + std::path::PathBuf::from("/tmp/timers.json"), + )), + gateway_active_project, + gateway_projects, + gateway_project_urls, + } + } + #[test] fn bot_context_is_clone() { // BotContext must be Clone for the Matrix event handler injection. @@ -154,36 +183,8 @@ mod tests { #[tokio::test] async fn effective_project_root_standalone_returns_project_root() { - // In standalone mode (gateway_active_project is None), the effective root - // must equal the project_root exactly. - let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); - let ctx = BotContext { - bot_user_id: make_user_id("@bot:example.com"), - target_room_ids: vec![], - project_root: PathBuf::from("/projects/myapp"), - allowed_users: vec![], - history: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - history_size: 20, - bot_sent_event_ids: Arc::new(TokioMutex::new(std::collections::HashSet::new())), - perm_rx: Arc::new(TokioMutex::new(perm_rx)), - pending_perm_replies: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - permission_timeout_secs: 120, - bot_name: "Assistant".to_string(), - ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), - agents: Arc::new(crate::agents::AgentPool::new_test(3000)), - htop_sessions: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - transport: Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new( - "test-phone".to_string(), - "test-token".to_string(), - "pipeline_notification".to_string(), - )), - timer_store: Arc::new(crate::service::timer::TimerStore::load( - std::path::PathBuf::from("/tmp/timers.json"), - )), - gateway_active_project: None, - gateway_projects: vec![], - gateway_project_urls: BTreeMap::new(), - }; + let services = test_services(PathBuf::from("/projects/myapp")); + let ctx = test_bot_context(services, None, vec![], BTreeMap::new()); assert_eq!( ctx.effective_project_root().await, PathBuf::from("/projects/myapp") @@ -192,39 +193,17 @@ mod tests { #[tokio::test] async fn effective_project_root_gateway_uses_active_project_subdir() { - // In gateway mode, the effective root must be config_dir / active_project_name. - let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); + let services = test_services(PathBuf::from("/gateway")); let active = Arc::new(RwLock::new("huskies".to_string())); - let ctx = BotContext { - bot_user_id: make_user_id("@bot:example.com"), - target_room_ids: vec![], - project_root: PathBuf::from("/gateway"), - allowed_users: vec![], - history: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - history_size: 20, - bot_sent_event_ids: Arc::new(TokioMutex::new(std::collections::HashSet::new())), - perm_rx: Arc::new(TokioMutex::new(perm_rx)), - pending_perm_replies: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - permission_timeout_secs: 120, - bot_name: "Assistant".to_string(), - ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), - agents: Arc::new(crate::agents::AgentPool::new_test(3000)), - htop_sessions: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - transport: Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new( - "test-phone".to_string(), - "test-token".to_string(), - "pipeline_notification".to_string(), - )), - timer_store: Arc::new(crate::service::timer::TimerStore::load( - std::path::PathBuf::from("/tmp/timers.json"), - )), - gateway_active_project: Some(Arc::clone(&active)), - gateway_projects: vec!["huskies".into(), "robot-studio".into()], - gateway_project_urls: BTreeMap::from([ + let ctx = test_bot_context( + services, + Some(Arc::clone(&active)), + vec!["huskies".into(), "robot-studio".into()], + BTreeMap::from([ ("huskies".into(), "http://localhost:3001".into()), ("robot-studio".into(), "http://localhost:3002".into()), ]), - }; + ); assert_eq!( ctx.effective_project_root().await, PathBuf::from("/gateway/huskies") @@ -233,46 +212,23 @@ mod tests { #[tokio::test] async fn effective_project_root_gateway_reflects_project_switch() { - // Switching the active project must change the effective root. - let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); + let services = test_services(PathBuf::from("/gateway")); let active = Arc::new(RwLock::new("huskies".to_string())); - let ctx = BotContext { - bot_user_id: make_user_id("@bot:example.com"), - target_room_ids: vec![], - project_root: PathBuf::from("/gateway"), - allowed_users: vec![], - history: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - history_size: 20, - bot_sent_event_ids: Arc::new(TokioMutex::new(std::collections::HashSet::new())), - perm_rx: Arc::new(TokioMutex::new(perm_rx)), - pending_perm_replies: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - permission_timeout_secs: 120, - bot_name: "Assistant".to_string(), - ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), - agents: Arc::new(crate::agents::AgentPool::new_test(3000)), - htop_sessions: Arc::new(TokioMutex::new(std::collections::HashMap::new())), - transport: Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new( - "test-phone".to_string(), - "test-token".to_string(), - "pipeline_notification".to_string(), - )), - timer_store: Arc::new(crate::service::timer::TimerStore::load( - std::path::PathBuf::from("/tmp/timers.json"), - )), - gateway_active_project: Some(Arc::clone(&active)), - gateway_projects: vec!["huskies".into(), "robot-studio".into()], - gateway_project_urls: BTreeMap::from([ + let ctx = test_bot_context( + services, + Some(Arc::clone(&active)), + vec!["huskies".into(), "robot-studio".into()], + BTreeMap::from([ ("huskies".into(), "http://localhost:3001".into()), ("robot-studio".into(), "http://localhost:3002".into()), ]), - }; + ); assert_eq!( ctx.effective_project_root().await, PathBuf::from("/gateway/huskies") ); - // Simulate switch_project changing the active project. *active.write().await = "robot-studio".to_string(); assert_eq!( @@ -283,37 +239,8 @@ mod tests { #[test] fn bot_context_has_no_require_verified_devices_field() { - // Verification is always on — BotContext no longer has a toggle field. - // This test verifies the struct can be constructed and cloned without it. - let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); - let ctx = BotContext { - bot_user_id: make_user_id("@bot:example.com"), - target_room_ids: vec![], - project_root: PathBuf::from("/tmp"), - allowed_users: vec![], - history: Arc::new(TokioMutex::new(HashMap::new())), - history_size: 20, - bot_sent_event_ids: Arc::new(TokioMutex::new(HashSet::new())), - perm_rx: Arc::new(TokioMutex::new(perm_rx)), - pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), - permission_timeout_secs: 120, - bot_name: "Assistant".to_string(), - ambient_rooms: Arc::new(std::sync::Mutex::new(HashSet::new())), - agents: Arc::new(crate::agents::AgentPool::new_test(3000)), - htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), - transport: Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new( - "test-phone".to_string(), - "test-token".to_string(), - "pipeline_notification".to_string(), - )), - timer_store: Arc::new(crate::service::timer::TimerStore::load( - std::path::PathBuf::from("/tmp/timers.json"), - )), - gateway_active_project: None, - gateway_projects: vec![], - gateway_project_urls: BTreeMap::new(), - }; - // Clone must work (required by Matrix SDK event handler injection). + let services = test_services(PathBuf::from("/tmp")); + let ctx = test_bot_context(services, None, vec![], BTreeMap::new()); let _cloned = ctx.clone(); } } diff --git a/server/src/chat/transport/matrix/bot/messages.rs b/server/src/chat/transport/matrix/bot/messages.rs index 40a75e8f..a1f2ad55 100644 --- a/server/src/chat/transport/matrix/bot/messages.rs +++ b/server/src/chat/transport/matrix/bot/messages.rs @@ -52,7 +52,7 @@ pub(super) async fn on_room_message( } // Ignore the bot's own messages to prevent echo loops. - if ev.sender == ctx.bot_user_id { + if ev.sender == ctx.matrix_user_id { return; } @@ -74,10 +74,15 @@ pub(super) async fn on_room_message( // Only respond when the bot is directly addressed (mentioned by name/ID), // when the message is a reply to one of the bot's own messages, or when // ambient mode is enabled for this room. - let is_addressed = mentions_bot(&body, formatted_body.as_deref(), &ctx.bot_user_id) + let is_addressed = mentions_bot(&body, formatted_body.as_deref(), &ctx.matrix_user_id) || is_reply_to_bot(ev.content.relates_to.as_ref(), &ctx.bot_sent_event_ids).await; let room_id_str = incoming_room_id.to_string(); - let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&room_id_str); + let is_ambient = ctx + .services + .ambient_rooms + .lock() + .unwrap() + .contains(&room_id_str); // Always let "ambient on" through — it is the one command that must work // even when the bot is not mentioned and ambient mode is off, otherwise @@ -98,7 +103,7 @@ pub(super) async fn on_room_message( if is_ambient && !is_addressed && !is_ambient_on - && is_addressed_to_other(&body, &ctx.bot_user_id, &ctx.bot_name) + && is_addressed_to_other(&body, &ctx.matrix_user_id, &ctx.services.bot_name) { slog!( "[matrix-bot] Ignoring ambient message addressed to another bot (sender={})", @@ -144,8 +149,8 @@ pub(super) async fn on_room_message( // If there is a pending permission prompt for this room, interpret the // message as a yes/no response instead of starting a new chat. { - let mut pending = ctx.pending_perm_replies.lock().await; - if let Some(tx) = pending.remove(&incoming_room_id) { + let mut pending = ctx.services.pending_perm_replies.lock().await; + if let Some(tx) = pending.remove(incoming_room_id.as_str()) { let decision = if is_permission_approval(&body) { PermissionDecision::Approve } else { @@ -190,8 +195,8 @@ pub(super) async fn on_room_message( let stripped = crate::chat::util::strip_bot_mention( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) .trim() .trim_start_matches(|c: char| !c.is_alphanumeric()) @@ -257,11 +262,11 @@ pub(super) async fn on_room_message( // the LLM. All commands are registered in commands.rs — no special-casing // needed here. let dispatch = super::super::commands::CommandDispatch { - bot_name: &ctx.bot_name, - bot_user_id: ctx.bot_user_id.as_str(), + bot_name: &ctx.services.bot_name, + bot_user_id: ctx.matrix_user_id.as_str(), project_root: &effective_root, - agents: &ctx.agents, - ambient_rooms: &ctx.ambient_rooms, + agents: &ctx.services.agents, + ambient_rooms: &ctx.services.ambient_rooms, room_id: &room_id_str, }; if let Some((response, response_html)) = @@ -283,8 +288,8 @@ pub(super) async fn on_room_message( // start) and cannot be handled by the sync command registry. if let Some(assign_cmd) = super::super::assign::extract_assign_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { let response = match assign_cmd { super::super::assign::AssignCommand::Assign { @@ -295,18 +300,18 @@ pub(super) async fn on_room_message( "[matrix-bot] Handling assign command from {sender}: story {story_number} model={model}" ); super::super::assign::handle_assign( - &ctx.bot_name, + &ctx.services.bot_name, &story_number, &model, &effective_root, - &ctx.agents, + &ctx.services.agents, ) .await } super::super::assign::AssignCommand::BadArgs => { format!( "Usage: `{} assign ` (e.g. `assign 42 opus`)", - ctx.bot_name + ctx.services.bot_name ) } }; @@ -326,8 +331,8 @@ pub(super) async fn on_room_message( // and cannot be handled by the sync command registry. if let Some(htop_cmd) = super::super::htop::extract_htop_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}"); match htop_cmd { @@ -344,7 +349,7 @@ pub(super) async fn on_room_message( &ctx.transport, &room_id_str, &ctx.htop_sessions, - Arc::clone(&ctx.agents), + Arc::clone(&ctx.services.agents), duration_secs, ) .await; @@ -357,22 +362,22 @@ pub(super) async fn on_room_message( // and cannot be handled by the sync command registry. if let Some(del_cmd) = super::super::delete::extract_delete_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { let response = match del_cmd { super::super::delete::DeleteCommand::Delete { story_number } => { slog!("[matrix-bot] Handling delete command from {sender}: story {story_number}"); super::super::delete::handle_delete( - &ctx.bot_name, + &ctx.services.bot_name, &story_number, &effective_root, - &ctx.agents, + &ctx.services.agents, ) .await } super::super::delete::DeleteCommand::BadArgs => { - format!("Usage: `{} delete `", ctx.bot_name) + format!("Usage: `{} delete `", ctx.services.bot_name) } }; let html = markdown_to_html(&response); @@ -391,22 +396,22 @@ pub(super) async fn on_room_message( // and cannot be handled by the sync command registry. if let Some(rmtree_cmd) = super::super::rmtree::extract_rmtree_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { let response = match rmtree_cmd { super::super::rmtree::RmtreeCommand::Rmtree { story_number } => { slog!("[matrix-bot] Handling rmtree command from {sender}: story {story_number}"); super::super::rmtree::handle_rmtree( - &ctx.bot_name, + &ctx.services.bot_name, &story_number, &effective_root, - &ctx.agents, + &ctx.services.agents, ) .await } super::super::rmtree::RmtreeCommand::BadArgs => { - format!("Usage: `{} rmtree `", ctx.bot_name) + format!("Usage: `{} rmtree `", ctx.services.bot_name) } }; let html = markdown_to_html(&response); @@ -425,8 +430,8 @@ pub(super) async fn on_room_message( // be handled by the sync command registry. if let Some(start_cmd) = super::super::start::extract_start_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { let response = match start_cmd { super::super::start::StartCommand::Start { @@ -437,18 +442,18 @@ pub(super) async fn on_room_message( "[matrix-bot] Handling start command from {sender}: story {story_number} agent={agent_hint:?}" ); super::super::start::handle_start( - &ctx.bot_name, + &ctx.services.bot_name, &story_number, agent_hint.as_deref(), &effective_root, - &ctx.agents, + &ctx.services.agents, ) .await } super::super::start::StartCommand::BadArgs => { format!( "Usage: `{} start ` or `{} start opus`", - ctx.bot_name, ctx.bot_name + ctx.services.bot_name, ctx.services.bot_name ) } }; @@ -468,17 +473,17 @@ pub(super) async fn on_room_message( // conversation history and cannot be handled by the sync command registry. if super::super::reset::extract_reset_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) .is_some() { slog!("[matrix-bot] Handling reset command from {sender}"); let response = super::super::reset::handle_reset( - &ctx.bot_name, + &ctx.services.bot_name, &incoming_room_id, &ctx.history, - &ctx.project_root, + &ctx.services.project_root, ) .await; let html = markdown_to_html(&response); @@ -497,8 +502,8 @@ pub(super) async fn on_room_message( // and cannot be handled by the sync command registry. if super::super::rebuild::extract_rebuild_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) .is_some() { @@ -514,9 +519,12 @@ pub(super) async fn on_room_message( { ctx.bot_sent_event_ids.lock().await.insert(event_id); } - let response = - super::super::rebuild::handle_rebuild(&ctx.bot_name, &ctx.project_root, &ctx.agents) - .await; + let response = super::super::rebuild::handle_rebuild( + &ctx.services.bot_name, + &ctx.services.project_root, + &ctx.services.agents, + ) + .await; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx .transport @@ -534,8 +542,8 @@ pub(super) async fn on_room_message( if let Some(ref active_project) = ctx.gateway_active_project { let stripped = crate::chat::util::strip_bot_mention( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) .trim() .trim_start_matches(|c: char| !c.is_alphanumeric()) @@ -574,14 +582,14 @@ pub(super) async fn on_room_message( // be handled by the sync command registry. if let Some(timer_cmd) = crate::service::timer::extract_timer_command( &user_message, - &ctx.bot_name, - ctx.bot_user_id.as_str(), + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), ) { slog!("[matrix-bot] Handling timer command from {sender}: {timer_cmd:?}"); let response = crate::service::timer::handle_timer_command( timer_cmd, &ctx.timer_store, - &ctx.project_root, + &ctx.services.project_root, ) .await; let html = markdown_to_html(&response); @@ -620,7 +628,7 @@ pub(super) async fn handle_message( // The prompt is just the current message with sender attribution. // Prior conversation context is carried by the Claude Code session. - let bot_name = &ctx.bot_name; + let bot_name = &ctx.services.bot_name; let active_project_ctx = if let Some(ref ap) = ctx.gateway_active_project { let name = ap.read().await.clone(); format!("[Active project: {name}]\n") @@ -671,7 +679,7 @@ pub(super) async fn handle_message( // The gateway proxies tool calls to the active project automatically. // In standalone mode, use the project root directly. let project_root_str = if ctx.is_gateway() { - ctx.project_root.to_string_lossy().to_string() + ctx.services.project_root.to_string_lossy().to_string() } else { ctx.effective_project_root() .await @@ -701,7 +709,7 @@ pub(super) async fn handle_message( // Lock the permission receiver for the duration of this chat session. // Permission requests from the MCP `prompt_permission` tool arrive here. - let mut perm_rx_guard = ctx.perm_rx.lock().await; + let mut perm_rx_guard = ctx.services.perm_rx.lock().await; let result = loop { tokio::select! { @@ -726,18 +734,18 @@ pub(super) async fn handle_message( // Store the MCP oneshot sender so the event handler can // resolve it when the user replies yes/no. - ctx.pending_perm_replies + ctx.services.pending_perm_replies .lock() .await - .insert(room_id.clone(), perm_fwd.response_tx); + .insert(room_id.to_string(), perm_fwd.response_tx); // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.pending_perm_replies); - let timeout_room_id = room_id.clone(); + let pending = Arc::clone(&ctx.services.pending_perm_replies); + let timeout_room_id = room_id.to_string(); let timeout_transport = Arc::clone(&ctx.transport); let timeout_room_id_str = room_id_str.clone(); let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); - let timeout_secs = ctx.permission_timeout_secs; + let timeout_secs = ctx.services.permission_timeout_secs; tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(timeout_secs)).await; if let Some(tx) = pending.lock().await.remove(&timeout_room_id) { @@ -844,7 +852,7 @@ pub(super) async fn handle_message( } // Persist to disk so history survives server restarts. - save_history(&ctx.project_root, &guard); + save_history(&ctx.services.project_root, &guard); } } diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 9ad2afce..df8a5610 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -1,14 +1,13 @@ //! Matrix bot run loop — connects to the homeserver and processes sync events. -use crate::agents::AgentPool; +use crate::services::Services; use crate::slog; use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::{Client, LoopCtrl, config::SyncSettings}; use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use tokio::sync::Mutex as TokioMutex; -use tokio::sync::{RwLock, mpsc, watch}; +use tokio::sync::{RwLock, watch}; use super::context::BotContext; use super::format::{format_startup_announcement, markdown_to_html}; @@ -22,16 +21,15 @@ use super::verification::{on_room_verification_request, on_to_device_verificatio #[allow(clippy::too_many_arguments)] pub async fn run_bot( config: super::super::config::BotConfig, - project_root: PathBuf, + services: Arc, watcher_rx: tokio::sync::broadcast::Receiver, watcher_rx_auto: tokio::sync::broadcast::Receiver, - perm_rx: Arc>>, - agents: Arc, shutdown_rx: watch::Receiver>, gateway_active_project: Option>>, gateway_projects: Vec, gateway_project_urls: std::collections::BTreeMap, ) -> Result<(), String> { + let project_root = &services.project_root; let store_path = project_root.join(".huskies").join("matrix_store"); let client = Client::builder() .homeserver_url(config.homeserver.as_deref().unwrap_or_default()) @@ -174,20 +172,22 @@ pub async fn run_bot( let poller_poll_interval = config.aggregated_notifications_poll_interval_secs; let poller_enabled = config.aggregated_notifications_enabled; - let persisted = load_history(&project_root); + let persisted = load_history(project_root); slog!( "[matrix-bot] Loaded persisted conversation history for {} room(s)", persisted.len() ); - // Restore persisted ambient rooms from config. - let persisted_ambient: HashSet = config.ambient_rooms.iter().cloned().collect(); - if !persisted_ambient.is_empty() { - slog!( - "[matrix-bot] Restored ambient mode for {} room(s): {:?}", - persisted_ambient.len(), - persisted_ambient - ); + // Ambient rooms are already restored in Services from bot.toml config. + { + let ambient = services.ambient_rooms.lock().unwrap(); + if !ambient.is_empty() { + slog!( + "[matrix-bot] Restored ambient mode for {} room(s): {:?}", + ambient.len(), + *ambient + ); + } } // Create the transport abstraction based on the configured transport type. @@ -222,11 +222,7 @@ pub async fn run_bot( } }; - let bot_name = config - .display_name - .clone() - .unwrap_or_else(|| "Assistant".to_string()); - let announce_bot_name = bot_name.clone(); + let announce_bot_name = services.bot_name.clone(); let timer_store = Arc::new(crate::service::timer::TimerStore::load( project_root.join(".huskies").join("timers.json"), @@ -238,19 +234,13 @@ pub async fn run_bot( ); let ctx = BotContext { - bot_user_id, + services, + matrix_user_id: bot_user_id, target_room_ids, - project_root, allowed_users: config.allowed_users, history: Arc::new(TokioMutex::new(persisted)), history_size: config.history_size, bot_sent_event_ids: Arc::new(TokioMutex::new(HashSet::new())), - perm_rx, - pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), - permission_timeout_secs: config.permission_timeout_secs, - bot_name, - ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)), - agents, htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), transport: Arc::clone(&transport), timer_store, diff --git a/server/src/chat/transport/matrix/mod.rs b/server/src/chat/transport/matrix/mod.rs index 47640f84..7464ceee 100644 --- a/server/src/chat/transport/matrix/mod.rs +++ b/server/src/chat/transport/matrix/mod.rs @@ -30,13 +30,12 @@ pub mod transport_impl; pub use bot::{ConversationEntry, ConversationRole, RoomConversation}; pub use config::BotConfig; -use crate::agents::AgentPool; -use crate::http::context::PermissionForward; use crate::io::watcher::WatcherEvent; use crate::rebuild::ShutdownReason; +use crate::services::Services; use std::path::Path; use std::sync::Arc; -use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast, mpsc, watch}; +use tokio::sync::{RwLock, broadcast, watch}; /// Attempt to start the Matrix bot. /// @@ -48,9 +47,9 @@ use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast, mpsc, watch}; /// posts stage-transition messages to all configured rooms whenever a work /// item moves between pipeline stages. /// -/// `perm_rx` is the permission-request receiver shared with the MCP -/// `prompt_permission` tool. The bot locks it during active chat sessions -/// to surface permission prompts to the Matrix room and relay user decisions. +/// `services` is the shared services bundle containing the agent pool, +/// permission plumbing, and bot identity. The bot accesses these via +/// `Arc` rather than holding its own copies. /// /// `shutdown_rx` is a watch channel that delivers a `ShutdownReason` when the /// server is about to stop (SIGINT/SIGTERM or rebuild). The bot uses this to @@ -65,8 +64,7 @@ use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast, mpsc, watch}; pub fn spawn_bot( project_root: &Path, watcher_tx: broadcast::Sender, - perm_rx: Arc>>, - agents: Arc, + services: Arc, shutdown_rx: watch::Receiver>, gateway_active_project: Option>>, gateway_projects: Vec, @@ -95,17 +93,14 @@ pub fn spawn_bot( config.effective_room_ids() ); - let root = project_root.to_path_buf(); let watcher_rx = watcher_tx.subscribe(); let watcher_rx_auto = watcher_tx.subscribe(); let handle = tokio::spawn(async move { if let Err(e) = bot::run_bot( config, - root, + services, watcher_rx, watcher_rx_auto, - perm_rx, - agents, shutdown_rx, gateway_active_project, gateway_projects, diff --git a/server/src/http/agents.rs b/server/src/http/agents.rs index 2eae605a..ad75a7f8 100644 --- a/server/src/http/agents.rs +++ b/server/src/http/agents.rs @@ -215,12 +215,13 @@ impl AgentsApi { ) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; let info = svc::start_agent( - &self.ctx.agents, + &self.ctx.services.agents, &project_root, &payload.0.story_id, payload.0.agent_name.as_deref(), @@ -244,12 +245,13 @@ impl AgentsApi { async fn stop_agent(&self, payload: Json) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; svc::stop_agent( - &self.ctx.agents, + &self.ctx.services.agents, &project_root, &payload.0.story_id, &payload.0.agent_name, @@ -267,9 +269,14 @@ impl AgentsApi { /// on frontend startup. #[oai(path = "/agents", method = "get")] async fn list_agents(&self) -> OpenApiResult>> { - let project_root = self.ctx.agents.get_project_root(&self.ctx.state).ok(); - let agents = - svc::list_agents(&self.ctx.agents, project_root.as_deref()).map_err(map_svc_error)?; + let project_root = self + .ctx + .services + .agents + .get_project_root(&self.ctx.state) + .ok(); + let agents = svc::list_agents(&self.ctx.services.agents, project_root.as_deref()) + .map_err(map_svc_error)?; Ok(Json( agents @@ -290,6 +297,7 @@ impl AgentsApi { async fn get_agent_config(&self) -> OpenApiResult>> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -308,6 +316,7 @@ impl AgentsApi { async fn reload_config(&self) -> OpenApiResult>> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -329,13 +338,18 @@ impl AgentsApi { ) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; - let info = svc::create_worktree(&self.ctx.agents, &project_root, &payload.0.story_id) - .await - .map_err(map_svc_error)?; + let info = svc::create_worktree( + &self.ctx.services.agents, + &project_root, + &payload.0.story_id, + ) + .await + .map_err(map_svc_error)?; Ok(Json(WorktreeInfoResponse { story_id: payload.0.story_id, @@ -350,6 +364,7 @@ impl AgentsApi { async fn list_worktrees(&self) -> OpenApiResult>> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -378,6 +393,7 @@ impl AgentsApi { ) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -417,6 +433,7 @@ impl AgentsApi { // Slow path: fall back to results persisted in the story file. let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -446,6 +463,7 @@ impl AgentsApi { ) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -461,6 +479,7 @@ impl AgentsApi { async fn remove_worktree(&self, story_id: Path) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -483,6 +502,7 @@ impl AgentsApi { ) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -517,6 +537,7 @@ impl AgentsApi { async fn get_all_token_usage(&self) -> OpenApiResult> { let project_root = self .ctx + .services .agents .get_project_root(&self.ctx.state) .map_err(bad_request)?; @@ -604,9 +625,13 @@ mod tests { let ctx = AppContext::new_test(root); // Inject an agent for the archived story (completed) and one for an active story - ctx.agents - .inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed); - ctx.agents + ctx.services.agents.inject_test_agent( + "79_story_archived", + "coder-1", + AgentStatus::Completed, + ); + ctx.services + .agents .inject_test_agent("80_story_active", "coder-1", AgentStatus::Running); let api = AgentsApi { ctx: Arc::new(ctx) }; @@ -631,8 +656,11 @@ mod tests { let ctx = AppContext::new_test(tmp.path().to_path_buf()); // Clear the project_root so get_project_root returns Err *ctx.state.project_root.lock().unwrap() = None; - ctx.agents - .inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed); + ctx.services.agents.inject_test_agent( + "42_story_whatever", + "coder-1", + AgentStatus::Completed, + ); let api = AgentsApi { ctx: Arc::new(ctx) }; let result = api.list_agents().await.unwrap().0; @@ -821,7 +849,8 @@ allowed_tools = ["Read", "Bash"] async fn stop_agent_succeeds_with_running_agent() { let tmp = TempDir::new().unwrap(); let ctx = AppContext::new_test(tmp.path().to_path_buf()); - ctx.agents + ctx.services + .agents .inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); let api = AgentsApi { ctx: Arc::new(ctx) }; let result = api diff --git a/server/src/http/agents_sse.rs b/server/src/http/agents_sse.rs index 4eeb385a..6bdbc7ed 100644 --- a/server/src/http/agents_sse.rs +++ b/server/src/http/agents_sse.rs @@ -18,7 +18,7 @@ pub async fn agent_stream( Path((story_id, agent_name)): Path<(String, String)>, ctx: Data<&Arc>, ) -> impl IntoResponse { - let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) { + let mut rx = match ctx.services.agents.subscribe(&story_id, &agent_name) { Ok(rx) => rx, Err(e) => { return Response::builder() @@ -89,6 +89,7 @@ mod tests { // Inject a running agent and get its broadcast sender. let tx = ctx + .services .agents .inject_test_agent("1_story", "coder-1", AgentStatus::Running); @@ -152,6 +153,7 @@ mod tests { let ctx = Arc::new(AppContext::new_test(tmp.path().to_path_buf())); let tx = ctx + .services .agents .inject_test_agent("2_story", "coder-1", AgentStatus::Running); diff --git a/server/src/http/bot_command.rs b/server/src/http/bot_command.rs index d9004b32..c532d394 100644 --- a/server/src/http/bot_command.rs +++ b/server/src/http/bot_command.rs @@ -66,7 +66,7 @@ impl BotCommandApi { let cmd = body.command.trim().to_ascii_lowercase(); let args = body.args.trim(); - let response = svc::execute(&cmd, args, &project_root, &self.ctx.agents) + let response = svc::execute(&cmd, args, &project_root, &self.ctx.services.agents) .await .map_err(|e| match e { svc::Error::UnknownCommand(msg) => { diff --git a/server/src/http/context.rs b/server/src/http/context.rs index e88abb2c..84cb2865 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -1,8 +1,9 @@ //! Application context — shared state (`AppContext`) threaded through all HTTP handlers. -use crate::agents::{AgentPool, ReconciliationEvent}; +use crate::agents::ReconciliationEvent; use crate::io::watcher::WatcherEvent; use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; use crate::service::timer::TimerStore; +use crate::services::Services; use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; @@ -64,7 +65,8 @@ pub struct AppContext { pub state: Arc, pub store: Arc, pub workflow: Arc>, - pub agents: Arc, + /// Shared services bundle (agent pool, bot identity, permissions, etc.). + pub services: Arc, /// Broadcast channel for filesystem watcher events. WebSocket handlers /// subscribe to this to push lifecycle notifications to connected clients. pub watcher_tx: broadcast::Sender, @@ -76,9 +78,6 @@ pub struct AppContext { /// `prompt_permission` tool. The MCP handler sends a [`PermissionForward`] /// and awaits the oneshot response. pub perm_tx: mpsc::UnboundedSender, - /// Receiver for permission requests. The active WebSocket handler locks - /// this and polls for incoming permission forwards. - pub perm_rx: Arc>>, /// Child process of the QA app launched for manual testing. /// Only one instance runs at a time. pub qa_app_process: Arc>>, @@ -110,6 +109,7 @@ pub struct AppContext { #[cfg(test)] impl AppContext { pub fn new_test(project_root: std::path::PathBuf) -> Self { + use crate::agents::AgentPool; let state = SessionState::default(); *state.project_root.lock().unwrap() = Some(project_root.clone()); let store_path = project_root.join(".huskies_store.json"); @@ -119,15 +119,24 @@ impl AppContext { let timer_store = Arc::new(TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let services = Arc::new(Services { + project_root: project_root.clone(), + agents: Arc::new(AgentPool::new(3001, watcher_tx.clone())), + bot_name: "Assistant".to_string(), + bot_user_id: String::new(), + ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), + perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + permission_timeout_secs: 120, + }); Self { state: Arc::new(state), store: Arc::new(JsonFileStore::new(store_path).unwrap()), workflow: Arc::new(std::sync::Mutex::new(WorkflowState::default())), - agents: Arc::new(AgentPool::new(3001, watcher_tx.clone())), + services, watcher_tx, reconciliation_tx, perm_tx, - perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), qa_app_process: Arc::new(std::sync::Mutex::new(None)), bot_shutdown: None, matrix_shutdown_tx: None, diff --git a/server/src/http/mcp/agent_tools.rs b/server/src/http/mcp/agent_tools.rs index de8046ad..3a342eb5 100644 --- a/server/src/http/mcp/agent_tools.rs +++ b/server/src/http/mcp/agent_tools.rs @@ -14,8 +14,9 @@ pub(super) async fn tool_start_agent(args: &Value, ctx: &AppContext) -> Result Result Result Result { - let project_root = ctx.agents.get_project_root(&ctx.state).ok(); - let agents = ctx.agents.list_agents()?; + let project_root = ctx.services.agents.get_project_root(&ctx.state).ok(); + let agents = ctx.services.agents.list_agents()?; serde_json::to_string_pretty(&json!( agents .iter() @@ -125,7 +127,7 @@ pub(super) async fn tool_get_agent_output( .map(|n| n as usize); let filter = args.get("filter").and_then(|v| v.as_str()); - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; // Collect all matching log files, oldest first. let log_files = agent_log::list_story_log_files(&project_root, story_id, agent_name_filter); @@ -149,7 +151,7 @@ pub(super) async fn tool_get_agent_output( // writer failed and nothing was persisted to disk. if log_files.is_empty() && let Some(agent_name) = agent_name_filter - && let Ok(live_events) = ctx.agents.drain_events(story_id, agent_name) + && let Ok(live_events) = ctx.services.agents.drain_events(story_id, agent_name) && !live_events.is_empty() { all_lines.push(format!("=== {agent_name} (live) ===")); @@ -195,7 +197,7 @@ pub(super) async fn tool_get_agent_output( } pub(super) fn tool_get_agent_config(ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let config = ProjectConfig::load(&project_root)?; // Collect available (idle) agent names across all stages so the caller can @@ -207,7 +209,11 @@ pub(super) fn tool_get_agent_config(ctx: &AppContext) -> Result PipelineStage::Mergemaster, PipelineStage::Other, ] { - if let Ok(names) = ctx.agents.available_agents_for_stage(&config, stage) { + if let Ok(names) = ctx + .services + .agents + .available_agents_for_stage(&config, stage) + { available_names.extend(names); } } @@ -249,7 +255,7 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( .ok_or("Missing required argument: agent_name")?; // Verify the agent exists and is running/pending. - let agents = ctx.agents.list_agents()?; + let agents = ctx.services.agents.list_agents()?; let agent_info = agents .iter() .find(|a| a.story_id == story_id && a.agent_name == agent_name) @@ -275,7 +281,7 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( )); } - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let config = ProjectConfig::load(&project_root)?; // Find the agent config (max_turns, max_budget_usd). @@ -341,6 +347,7 @@ pub(super) async fn tool_wait_for_agent(args: &Value, ctx: &AppContext) -> Resul .unwrap_or(300_000); // default: 5 minutes let info = ctx + .services .agents .wait_for_agent(story_id, agent_name, timeout_ms) .await?; @@ -377,8 +384,12 @@ pub(super) async fn tool_create_worktree(args: &Value, ctx: &AppContext) -> Resu .and_then(|v| v.as_str()) .ok_or("Missing required argument: story_id")?; - let project_root = ctx.agents.get_project_root(&ctx.state)?; - let info = ctx.agents.create_worktree(&project_root, story_id).await?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; + let info = ctx + .services + .agents + .create_worktree(&project_root, story_id) + .await?; serde_json::to_string_pretty(&json!({ "story_id": story_id, @@ -390,7 +401,7 @@ pub(super) async fn tool_create_worktree(args: &Value, ctx: &AppContext) -> Resu } pub(super) fn tool_list_worktrees(ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let entries = worktree::list_worktrees(&project_root)?; serde_json::to_string_pretty(&json!( @@ -411,7 +422,7 @@ pub(super) async fn tool_remove_worktree(args: &Value, ctx: &AppContext) -> Resu .and_then(|v| v.as_str()) .ok_or("Missing required argument: story_id")?; - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let config = ProjectConfig::load(&project_root)?; worktree::remove_worktree_by_story_id(&project_root, story_id, &config).await?; @@ -876,7 +887,8 @@ stage = "coder" use crate::agents::AgentStatus; let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); - ctx.agents + ctx.services + .agents .inject_test_agent("41_story", "worker", AgentStatus::Completed); let result = tool_wait_for_agent( @@ -980,7 +992,8 @@ stage = "coder" use crate::agents::AgentStatus; let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); - ctx.agents + ctx.services + .agents .inject_test_agent("42_story", "coder-1", AgentStatus::Completed); let result = tool_get_agent_remaining_turns_and_budget( @@ -1004,7 +1017,8 @@ stage = "coder" let ctx = test_ctx(tmp.path()); ctx.store .set("project_root", json!(tmp.path().to_string_lossy().as_ref())); - ctx.agents + ctx.services + .agents .inject_test_agent("42_story", "coder-1", AgentStatus::Running); let result = tool_get_agent_remaining_turns_and_budget( diff --git a/server/src/http/mcp/diagnostics.rs b/server/src/http/mcp/diagnostics.rs index 5f498b03..06ca780a 100644 --- a/server/src/http/mcp/diagnostics.rs +++ b/server/src/http/mcp/diagnostics.rs @@ -46,7 +46,7 @@ pub(super) async fn tool_rebuild_and_restart(ctx: &AppContext) -> Result Result Result { serde_json::to_string_pretty(&json!({ "version": env!("CARGO_PKG_VERSION"), "build_hash": build_hash.trim(), - "port": ctx.agents.port(), + "port": ctx.services.agents.port(), })) .map_err(|e| format!("Serialization error: {e}")) } @@ -420,7 +420,7 @@ mod tests { // then respond with approval. The try_lock() inside tool_prompt_permission // must fail (lock held) so the request is forwarded rather than auto-denied. let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>(); - let perm_rx = ctx.perm_rx.clone(); + let perm_rx = ctx.services.perm_rx.clone(); tokio::spawn(async move { let mut rx = perm_rx.lock().await; let _ = ready_tx.send(()); // signal: lock is held @@ -459,7 +459,7 @@ mod tests { // Simulate an interactive session: lock perm_rx first, then deny. let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>(); - let perm_rx = ctx.perm_rx.clone(); + let perm_rx = ctx.services.perm_rx.clone(); tokio::spawn(async move { let mut rx = perm_rx.lock().await; let _ = ready_tx.send(()); // signal: lock is held @@ -637,8 +637,8 @@ mod tests { // then exec() will be called — which would replace our test process. // So we only test that the function *runs* without panicking up to // the agent-kill step. We do this by checking the pool is empty. - assert_eq!(ctx.agents.list_agents().unwrap().len(), 0); - ctx.agents.kill_all_children(); // should not panic on empty pool + assert_eq!(ctx.services.agents.list_agents().unwrap().len(), 0); + ctx.services.agents.kill_all_children(); // should not panic on empty pool } #[test] diff --git a/server/src/http/mcp/git_tools.rs b/server/src/http/mcp/git_tools.rs index 062f702e..5feb51c4 100644 --- a/server/src/http/mcp/git_tools.rs +++ b/server/src/http/mcp/git_tools.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; /// Thin wrapper that obtains the project root from `ctx` and delegates to /// `service::git_ops::io::validate_worktree_path`. fn validate_worktree_path(worktree_path: &str, ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; crate::service::git_ops::io::validate_worktree_path(worktree_path, &project_root) .map_err(|e| e.to_string()) } diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index cf84daea..4aefd182 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -32,15 +32,17 @@ pub(super) async fn tool_merge_agent_work( .map_err(|e| format!("Serialization error: {e}")); } - let project_root = ctx.agents.get_project_root(&ctx.state)?; - ctx.agents.start_merge_agent_work(&project_root, story_id)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; + ctx.services + .agents + .start_merge_agent_work(&project_root, story_id)?; // Block until the merge completes instead of returning immediately. // Uses tokio::time::sleep so the async executor is not blocked. // This prevents the mergemaster from burning all its turns polling // get_merge_status in a tight loop. let sid = story_id.to_string(); - let agents = ctx.agents.clone(); + let agents = ctx.services.agents.clone(); loop { tokio::time::sleep(std::time::Duration::from_secs(10)).await; if let Some(job) = agents.get_merge_status(&sid) { @@ -64,9 +66,13 @@ pub(super) fn tool_get_merge_status(args: &Value, ctx: &AppContext) -> Result { @@ -130,13 +136,14 @@ pub(super) async fn tool_move_story_to_merge( .and_then(|v| v.as_str()) .unwrap_or("mergemaster"); - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; // Move story from work/2_current/ to work/4_merge/ move_story_to_merge(&project_root, story_id)?; // Start the mergemaster agent on the story worktree let info = ctx + .services .agents .start_agent(&project_root, story_id, Some(agent_name), None, None) .await?; @@ -165,7 +172,7 @@ pub(super) fn tool_report_merge_failure(args: &Value, ctx: &AppContext) -> Resul .ok_or("Missing required argument: reason")?; slog!("[mergemaster] Merge failure reported for '{story_id}': {reason}"); - ctx.agents.set_merge_failure_reported(story_id); + ctx.services.agents.set_merge_failure_reported(story_id); // Broadcast the failure so the Matrix notification listener can post an // error message to configured rooms without coupling this tool to the bot. diff --git a/server/src/http/mcp/qa_tools.rs b/server/src/http/mcp/qa_tools.rs index 130c8fba..6cd0b169 100644 --- a/server/src/http/mcp/qa_tools.rs +++ b/server/src/http/mcp/qa_tools.rs @@ -21,13 +21,14 @@ pub(super) async fn tool_request_qa(args: &Value, ctx: &AppContext) -> Result Result Result Result Result Result Result .and_then(|v| v.as_str()) .ok_or("Missing required argument: story_id")?; - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; // Find the worktree path for this story let worktrees = crate::worktree::list_worktrees(&project_root)?; diff --git a/server/src/http/mcp/shell_tools.rs b/server/src/http/mcp/shell_tools.rs index a1b06f59..83cd52e4 100644 --- a/server/src/http/mcp/shell_tools.rs +++ b/server/src/http/mcp/shell_tools.rs @@ -22,7 +22,7 @@ const MAX_OUTPUT_LINES: usize = 100; /// Thin wrapper that obtains the project root from `ctx` and delegates to /// `service::shell::io::validate_working_dir`. fn validate_working_dir(working_dir: &str, ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; crate::service::shell::io::validate_working_dir(working_dir, &project_root) .map_err(|e| e.to_string()) } @@ -264,7 +264,7 @@ pub(super) fn handle_run_command_sse( /// /// The child process is properly killed on timeout (no zombies). pub(super) async fn tool_run_tests(args: &Value, ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let working_dir = match args.get("worktree_path").and_then(|v| v.as_str()) { Some(wt) => validate_working_dir(wt, ctx)?, @@ -423,7 +423,7 @@ const TEST_POLL_BLOCK_SECS: u64 = 20; /// when the test finishes, or after 15s with `{"status": "running"}`. /// This server-side blocking prevents agents from wasting turns polling. pub(super) async fn tool_get_test_result(args: &Value, ctx: &AppContext) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let working_dir = match args.get("worktree_path").and_then(|v| v.as_str()) { Some(wt) => validate_working_dir(wt, ctx)?, @@ -563,7 +563,7 @@ async fn run_script_tool( args: &Value, ctx: &AppContext, ) -> Result { - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let working_dir = match args.get("worktree_path").and_then(|v| v.as_str()) { Some(wt) => validate_working_dir(wt, ctx)?, diff --git a/server/src/http/mcp/story_tools.rs b/server/src/http/mcp/story_tools.rs index a41b0a20..badc1b9b 100644 --- a/server/src/http/mcp/story_tools.rs +++ b/server/src/http/mcp/story_tools.rs @@ -300,7 +300,7 @@ pub(super) fn tool_accept_story(args: &Value, ctx: &AppContext) -> Result Result Result Result< .and_then(|v| v.as_str()) .ok_or("Missing required argument: story_id")?; - let project_root = ctx.agents.get_project_root(&ctx.state)?; + let project_root = ctx.services.agents.get_project_root(&ctx.state)?; let mut failed_steps: Vec = Vec::new(); // 0. Cancel any pending rate-limit retry timers for this story (bug 514). @@ -571,9 +571,10 @@ pub(super) async fn tool_delete_story(args: &Value, ctx: &AppContext) -> Result< } // 1. Stop any running agents for this story (best-effort). - if let Ok(agents) = ctx.agents.list_agents() { + if let Ok(agents) = ctx.services.agents.list_agents() { for agent in agents.iter().filter(|a| a.story_id == story_id) { match ctx + .services .agents .stop_agent(&project_root, story_id, &agent.agent_name) .await @@ -596,7 +597,7 @@ pub(super) async fn tool_delete_story(args: &Value, ctx: &AppContext) -> Result< } // 2. Remove agent pool entries. - let removed_count = ctx.agents.remove_agents_for_story(story_id); + let removed_count = ctx.services.agents.remove_agents_for_story(story_id); slog_warn!("[delete_story] Removed {removed_count} agent pool entries for '{story_id}'"); // 3. Remove worktree (best-effort). @@ -903,7 +904,7 @@ mod tests { ); let ctx = test_ctx(tmp.path()); - ctx.agents.inject_test_agent( + ctx.services.agents.inject_test_agent( "9921_story_active", "coder-1", crate::agents::AgentStatus::Running, diff --git a/server/src/http/project.rs b/server/src/http/project.rs index e3df681a..dca8aa6f 100644 --- a/server/src/http/project.rs +++ b/server/src/http/project.rs @@ -52,7 +52,7 @@ impl ProjectApi { payload.0.path, &self.ctx.state, self.ctx.store.as_ref(), - self.ctx.agents.port(), + self.ctx.services.agents.port(), ) .await .map_err(map_project_error)?; diff --git a/server/src/http/workflow/mod.rs b/server/src/http/workflow/mod.rs index 80018f19..2191edea 100644 --- a/server/src/http/workflow/mod.rs +++ b/server/src/http/workflow/mod.rs @@ -164,7 +164,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { /// Build a map from story_id → AgentAssignment for all pending/running agents. fn build_active_agent_map(ctx: &AppContext) -> HashMap { - let agents = match ctx.agents.list_agents() { + let agents = match ctx.services.agents.list_agents() { Ok(a) => a, Err(_) => return HashMap::new(), }; @@ -569,7 +569,7 @@ mod tests { ); let ctx = crate::http::context::AppContext::new_test(root); - ctx.agents.inject_test_agent( + ctx.services.agents.inject_test_agent( "9860_story_test", "coder-1", crate::agents::AgentStatus::Running, @@ -604,7 +604,7 @@ mod tests { ); let ctx = crate::http::context::AppContext::new_test(root); - ctx.agents.inject_test_agent( + ctx.services.agents.inject_test_agent( "9861_story_done", "coder-1", crate::agents::AgentStatus::Completed, @@ -636,7 +636,7 @@ mod tests { ); let ctx = crate::http::context::AppContext::new_test(root); - ctx.agents.inject_test_agent( + ctx.services.agents.inject_test_agent( "9862_story_pending", "coder-1", crate::agents::AgentStatus::Pending, diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index f44f91e1..2d8ff6f5 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -102,7 +102,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem ); tokio::pin!(chat_fut); - let mut perm_rx = ctx.perm_rx.lock().await; + let mut perm_rx = ctx.services.perm_rx.lock().await; let chat_result = loop { tokio::select! { diff --git a/server/src/main.rs b/server/src/main.rs index 94db9bae..21c6b77a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,6 +22,7 @@ pub mod node_identity; pub(crate) mod pipeline_state; pub mod rebuild; mod service; +pub mod services; mod state; mod store; mod workflow; @@ -560,10 +561,9 @@ async fn main() -> Result<(), std::io::Error> { let watcher_rx_for_discord = watcher_tx.subscribe(); // Subscribe to watcher events for the per-project event buffer (gateway polling). let watcher_rx_for_events = watcher_tx.subscribe(); - // Wrap perm_rx in Arc so it can be shared with both the WebSocket - // handler (via AppContext) and the Matrix bot. + // Wrap perm_rx in Arc so it can be shared across the Services + // bundle (AppContext + Matrix bot) and the webhook-based transports. let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); - let perm_rx_for_bot = Arc::clone(&perm_rx); let perm_rx_for_whatsapp = Arc::clone(&perm_rx); let perm_rx_for_slack = Arc::clone(&perm_rx); let perm_rx_for_discord = Arc::clone(&perm_rx); @@ -576,6 +576,36 @@ async fn main() -> Result<(), std::io::Error> { // Clone for shutdown cleanup — kill orphaned PTY children before exiting. let agents_for_shutdown = Arc::clone(&agents); + // ── Construct the shared Services bundle ──────────────────────────── + // + // A single `Arc` is built here and cloned into `AppContext` + // and the Matrix `BotContext`. Bot-level fields (name, user-id, etc.) + // come from `bot.toml` when present; otherwise sensible defaults apply. + let bot_cfg_for_services = startup_root + .as_ref() + .and_then(|root| chat::transport::matrix::BotConfig::load(root)); + let services = Arc::new(services::Services { + project_root: startup_root.clone().unwrap_or_default(), + agents: Arc::clone(&agents), + bot_name: bot_cfg_for_services + .as_ref() + .and_then(|c| c.display_name.clone()) + .unwrap_or_else(|| "Assistant".to_string()), + bot_user_id: String::new(), + ambient_rooms: Arc::new(std::sync::Mutex::new( + bot_cfg_for_services + .as_ref() + .map(|c| c.ambient_rooms.iter().cloned().collect()) + .unwrap_or_default(), + )), + perm_rx: Arc::clone(&perm_rx), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), + permission_timeout_secs: bot_cfg_for_services + .as_ref() + .map(|c| c.permission_timeout_secs) + .unwrap_or(120), + }); + // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". let whatsapp_ctx: Option> = startup_root .as_ref() @@ -806,11 +836,10 @@ async fn main() -> Result<(), std::io::Error> { state: app_state, store, workflow, - agents, + services: Arc::clone(&services), watcher_tx, reconciliation_tx, perm_tx, - perm_rx, qa_app_process: Arc::new(std::sync::Mutex::new(None)), bot_shutdown: bot_shutdown_notifier.clone(), matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)), @@ -890,8 +919,7 @@ async fn main() -> Result<(), std::io::Error> { let _ = chat::transport::matrix::spawn_bot( root, watcher_tx_for_bot, - perm_rx_for_bot, - Arc::clone(&startup_agents), + Arc::clone(&services), matrix_shutdown_rx, None, vec![], diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index f5cedbd7..e7abfe35 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -382,6 +382,7 @@ pub fn spawn_gateway_bot( port: u16, ) -> Option { use crate::agents::AgentPool; + use crate::services::Services; use tokio::sync::{broadcast, mpsc}; let (watcher_tx, _) = broadcast::channel(16); @@ -394,11 +395,23 @@ pub fn spawn_gateway_bot( let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone())); + let services = std::sync::Arc::new(Services { + project_root: config_dir.to_path_buf(), + agents, + bot_name: "Assistant".to_string(), + bot_user_id: String::new(), + ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), + perm_rx, + pending_perm_replies: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), + permission_timeout_secs: 120, + }); + crate::chat::transport::matrix::spawn_bot( config_dir, watcher_tx, - perm_rx, - agents, + services, shutdown_rx, Some(active_project), gateway_projects, diff --git a/server/src/services.rs b/server/src/services.rs new file mode 100644 index 00000000..c4112f38 --- /dev/null +++ b/server/src/services.rs @@ -0,0 +1,38 @@ +//! Shared services bundle — common state threaded through HTTP handlers and chat transports. +//! +//! `Services` bundles the fields that every transport (Matrix, Slack, Discord, +//! WhatsApp) and the HTTP/MCP layer need. A single `Arc` is +//! constructed once in `main.rs` and cloned into `AppContext` and each +//! transport's context struct. + +use crate::agents::AgentPool; +use crate::http::context::{PermissionDecision, PermissionForward}; +use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::{Mutex as TokioMutex, mpsc, oneshot}; + +/// Shared state bundle constructed once at startup and cloned (via `Arc`) into +/// every context that needs access to the project root, agent pool, bot +/// identity, ambient-room set, or permission plumbing. +pub struct Services { + /// Absolute path to the project root directory. + pub project_root: PathBuf, + /// Agent pool for starting, stopping, and querying coding agents. + pub agents: Arc, + /// Display name the bot uses to identify itself (from `bot.toml`). + pub bot_name: String, + /// String representation of the bot's user ID (e.g. `"@timmy:hs.local"` + /// for Matrix, `"slack-bot"` for Slack). + pub bot_user_id: String, + /// Set of room/channel IDs where ambient mode is active. + pub ambient_rooms: Arc>>, + /// Receiver for permission requests from the MCP `prompt_permission` tool. + pub perm_rx: Arc>>, + /// Per-room pending permission reply senders, keyed by room/channel ID + /// as a plain string. + pub pending_perm_replies: Arc>>>, + /// Seconds to wait for a user to respond to a permission prompt before + /// auto-denying (fail-closed). + pub permission_timeout_secs: u64, +}