huskies: merge 639_refactor_migrate_whatsapp_transport_to_services_bundle
This commit is contained in:
@@ -29,7 +29,7 @@ pub(super) async fn handle_incoming_message(
|
|||||||
// If there is a pending permission prompt for this sender, interpret the
|
// If there is a pending permission prompt for this sender, interpret the
|
||||||
// message as a yes/no response instead of starting a new command/LLM flow.
|
// message as a yes/no response instead of starting a new command/LLM flow.
|
||||||
{
|
{
|
||||||
let mut pending = ctx.pending_perm_replies.lock().await;
|
let mut pending = ctx.services.pending_perm_replies.lock().await;
|
||||||
if let Some(tx) = pending.remove(sender) {
|
if let Some(tx) = pending.remove(sender) {
|
||||||
let decision = if is_permission_approval(message) {
|
let decision = if is_permission_approval(message) {
|
||||||
PermissionDecision::Approve
|
PermissionDecision::Approve
|
||||||
@@ -49,11 +49,11 @@ pub(super) async fn handle_incoming_message(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let dispatch = CommandDispatch {
|
let dispatch = CommandDispatch {
|
||||||
bot_name: &ctx.bot_name,
|
bot_name: &ctx.services.bot_name,
|
||||||
bot_user_id: &ctx.bot_user_id,
|
bot_user_id: &ctx.services.bot_user_id,
|
||||||
project_root: &ctx.project_root,
|
project_root: &ctx.services.project_root,
|
||||||
agents: &ctx.agents,
|
agents: &ctx.services.agents,
|
||||||
ambient_rooms: &ctx.ambient_rooms,
|
ambient_rooms: &ctx.services.ambient_rooms,
|
||||||
room_id: sender,
|
room_id: sender,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -69,8 +69,8 @@ pub(super) async fn handle_incoming_message(
|
|||||||
// Check for async commands (htop, delete).
|
// Check for async commands (htop, delete).
|
||||||
if let Some(htop_cmd) = crate::chat::transport::matrix::htop::extract_htop_command(
|
if let Some(htop_cmd) = crate::chat::transport::matrix::htop::extract_htop_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
) {
|
) {
|
||||||
use crate::chat::transport::matrix::htop::HtopCommand;
|
use crate::chat::transport::matrix::htop::HtopCommand;
|
||||||
slog!("[whatsapp] Handling htop command from {sender}");
|
slog!("[whatsapp] Handling htop command from {sender}");
|
||||||
@@ -87,7 +87,7 @@ pub(super) async fn handle_incoming_message(
|
|||||||
// On WhatsApp, send a single snapshot instead of a live-updating
|
// On WhatsApp, send a single snapshot instead of a live-updating
|
||||||
// dashboard since we can't edit messages.
|
// dashboard since we can't edit messages.
|
||||||
let snapshot = crate::chat::transport::matrix::htop::build_htop_message(
|
let snapshot = crate::chat::transport::matrix::htop::build_htop_message(
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
0,
|
0,
|
||||||
duration_secs,
|
duration_secs,
|
||||||
);
|
);
|
||||||
@@ -99,22 +99,22 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if let Some(del_cmd) = crate::chat::transport::matrix::delete::extract_delete_command(
|
if let Some(del_cmd) = crate::chat::transport::matrix::delete::extract_delete_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
) {
|
) {
|
||||||
let response = match del_cmd {
|
let response = match del_cmd {
|
||||||
crate::chat::transport::matrix::delete::DeleteCommand::Delete { story_number } => {
|
crate::chat::transport::matrix::delete::DeleteCommand::Delete { story_number } => {
|
||||||
slog!("[whatsapp] Handling delete command from {sender}: story {story_number}");
|
slog!("[whatsapp] Handling delete command from {sender}: story {story_number}");
|
||||||
crate::chat::transport::matrix::delete::handle_delete(
|
crate::chat::transport::matrix::delete::handle_delete(
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&story_number,
|
&story_number,
|
||||||
&ctx.project_root,
|
&ctx.services.project_root,
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
crate::chat::transport::matrix::delete::DeleteCommand::BadArgs => {
|
crate::chat::transport::matrix::delete::DeleteCommand::BadArgs => {
|
||||||
format!("Usage: `{} delete <number>`", ctx.bot_name)
|
format!("Usage: `{} delete <number>`", ctx.services.bot_name)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let _ = ctx.transport.send_message(sender, &response, "").await;
|
let _ = ctx.transport.send_message(sender, &response, "").await;
|
||||||
@@ -123,8 +123,8 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if crate::chat::transport::matrix::rebuild::extract_rebuild_command(
|
if crate::chat::transport::matrix::rebuild::extract_rebuild_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
)
|
)
|
||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
@@ -132,9 +132,9 @@ pub(super) async fn handle_incoming_message(
|
|||||||
let ack = "Rebuilding server… this may take a moment.";
|
let ack = "Rebuilding server… this may take a moment.";
|
||||||
let _ = ctx.transport.send_message(sender, ack, "").await;
|
let _ = ctx.transport.send_message(sender, ack, "").await;
|
||||||
let response = crate::chat::transport::matrix::rebuild::handle_rebuild(
|
let response = crate::chat::transport::matrix::rebuild::handle_rebuild(
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.project_root,
|
&ctx.services.project_root,
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let _ = ctx.transport.send_message(sender, &response, "").await;
|
let _ = ctx.transport.send_message(sender, &response, "").await;
|
||||||
@@ -143,22 +143,22 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command(
|
if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
) {
|
) {
|
||||||
let response = match rmtree_cmd {
|
let response = match rmtree_cmd {
|
||||||
crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => {
|
crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => {
|
||||||
slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}");
|
slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}");
|
||||||
crate::chat::transport::matrix::rmtree::handle_rmtree(
|
crate::chat::transport::matrix::rmtree::handle_rmtree(
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&story_number,
|
&story_number,
|
||||||
&ctx.project_root,
|
&ctx.services.project_root,
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => {
|
crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => {
|
||||||
format!("Usage: `{} rmtree <number>`", ctx.bot_name)
|
format!("Usage: `{} rmtree <number>`", ctx.services.bot_name)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let _ = ctx.transport.send_message(sender, &response, "").await;
|
let _ = ctx.transport.send_message(sender, &response, "").await;
|
||||||
@@ -167,8 +167,8 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if crate::chat::transport::matrix::reset::extract_reset_command(
|
if crate::chat::transport::matrix::reset::extract_reset_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
)
|
)
|
||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
@@ -180,7 +180,7 @@ pub(super) async fn handle_incoming_message(
|
|||||||
.or_insert_with(RoomConversation::default);
|
.or_insert_with(RoomConversation::default);
|
||||||
conv.session_id = None;
|
conv.session_id = None;
|
||||||
conv.entries.clear();
|
conv.entries.clear();
|
||||||
save_whatsapp_history(&ctx.project_root, &guard);
|
save_whatsapp_history(&ctx.services.project_root, &guard);
|
||||||
}
|
}
|
||||||
let _ = ctx
|
let _ = ctx
|
||||||
.transport
|
.transport
|
||||||
@@ -191,8 +191,8 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if let Some(start_cmd) = crate::chat::transport::matrix::start::extract_start_command(
|
if let Some(start_cmd) = crate::chat::transport::matrix::start::extract_start_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
) {
|
) {
|
||||||
let response = match start_cmd {
|
let response = match start_cmd {
|
||||||
crate::chat::transport::matrix::start::StartCommand::Start {
|
crate::chat::transport::matrix::start::StartCommand::Start {
|
||||||
@@ -201,16 +201,16 @@ pub(super) async fn handle_incoming_message(
|
|||||||
} => {
|
} => {
|
||||||
slog!("[whatsapp] Handling start command from {sender}: story {story_number}");
|
slog!("[whatsapp] Handling start command from {sender}: story {story_number}");
|
||||||
crate::chat::transport::matrix::start::handle_start(
|
crate::chat::transport::matrix::start::handle_start(
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&story_number,
|
&story_number,
|
||||||
agent_hint.as_deref(),
|
agent_hint.as_deref(),
|
||||||
&ctx.project_root,
|
&ctx.services.project_root,
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
crate::chat::transport::matrix::start::StartCommand::BadArgs => {
|
crate::chat::transport::matrix::start::StartCommand::BadArgs => {
|
||||||
format!("Usage: `{} start <number>`", ctx.bot_name)
|
format!("Usage: `{} start <number>`", ctx.services.bot_name)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let _ = ctx.transport.send_message(sender, &response, "").await;
|
let _ = ctx.transport.send_message(sender, &response, "").await;
|
||||||
@@ -219,8 +219,8 @@ pub(super) async fn handle_incoming_message(
|
|||||||
|
|
||||||
if let Some(assign_cmd) = crate::chat::transport::matrix::assign::extract_assign_command(
|
if let Some(assign_cmd) = crate::chat::transport::matrix::assign::extract_assign_command(
|
||||||
message,
|
message,
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&ctx.bot_user_id,
|
&ctx.services.bot_user_id,
|
||||||
) {
|
) {
|
||||||
let response = match assign_cmd {
|
let response = match assign_cmd {
|
||||||
crate::chat::transport::matrix::assign::AssignCommand::Assign {
|
crate::chat::transport::matrix::assign::AssignCommand::Assign {
|
||||||
@@ -231,16 +231,16 @@ pub(super) async fn handle_incoming_message(
|
|||||||
"[whatsapp] Handling assign command from {sender}: story {story_number} model {model}"
|
"[whatsapp] Handling assign command from {sender}: story {story_number} model {model}"
|
||||||
);
|
);
|
||||||
crate::chat::transport::matrix::assign::handle_assign(
|
crate::chat::transport::matrix::assign::handle_assign(
|
||||||
&ctx.bot_name,
|
&ctx.services.bot_name,
|
||||||
&story_number,
|
&story_number,
|
||||||
&model,
|
&model,
|
||||||
&ctx.project_root,
|
&ctx.services.project_root,
|
||||||
&ctx.agents,
|
&ctx.services.agents,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
crate::chat::transport::matrix::assign::AssignCommand::BadArgs => {
|
crate::chat::transport::matrix::assign::AssignCommand::BadArgs => {
|
||||||
format!("Usage: `{} assign <number> <model>`", ctx.bot_name)
|
format!("Usage: `{} assign <number> <model>`", ctx.services.bot_name)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let formatted = markdown_to_whatsapp(&response);
|
let formatted = markdown_to_whatsapp(&response);
|
||||||
@@ -266,7 +266,7 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
|
|||||||
guard.get(sender).and_then(|conv| conv.session_id.clone())
|
guard.get(sender).and_then(|conv| conv.session_id.clone())
|
||||||
};
|
};
|
||||||
|
|
||||||
let bot_name = &ctx.bot_name;
|
let bot_name = &ctx.services.bot_name;
|
||||||
let prompt = format!(
|
let prompt = format!(
|
||||||
"[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}"
|
"[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}"
|
||||||
);
|
);
|
||||||
@@ -297,7 +297,7 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
|
|||||||
let sent_any_chunk = Arc::new(AtomicBool::new(false));
|
let sent_any_chunk = Arc::new(AtomicBool::new(false));
|
||||||
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
|
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
|
||||||
|
|
||||||
let project_root_str = ctx.project_root.to_string_lossy().to_string();
|
let project_root_str = ctx.services.project_root.to_string_lossy().to_string();
|
||||||
let chat_fut = provider.chat_stream(
|
let chat_fut = provider.chat_stream(
|
||||||
&prompt,
|
&prompt,
|
||||||
&project_root_str,
|
&project_root_str,
|
||||||
@@ -319,7 +319,7 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
|
|||||||
tokio::pin!(chat_fut);
|
tokio::pin!(chat_fut);
|
||||||
|
|
||||||
// Lock the permission receiver for the duration of this chat session.
|
// Lock the permission receiver for the duration of this chat session.
|
||||||
let mut perm_rx_guard = ctx.perm_rx.lock().await;
|
let mut perm_rx_guard = ctx.services.perm_rx.lock().await;
|
||||||
|
|
||||||
let result = loop {
|
let result = loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -339,16 +339,16 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
|
|||||||
|
|
||||||
// Store the response sender so the incoming message handler
|
// Store the response sender so the incoming message handler
|
||||||
// can resolve it when the user replies yes/no.
|
// can resolve it when the user replies yes/no.
|
||||||
ctx.pending_perm_replies
|
ctx.services.pending_perm_replies
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.insert(sender.to_string(), perm_fwd.response_tx);
|
.insert(sender.to_string(), perm_fwd.response_tx);
|
||||||
|
|
||||||
// Spawn a timeout task: auto-deny if the user does not respond.
|
// Spawn a timeout task: auto-deny if the user does not respond.
|
||||||
let pending = Arc::clone(&ctx.pending_perm_replies);
|
let pending = Arc::clone(&ctx.services.pending_perm_replies);
|
||||||
let timeout_sender = sender.to_string();
|
let timeout_sender = sender.to_string();
|
||||||
let timeout_transport = Arc::clone(&ctx.transport);
|
let timeout_transport = Arc::clone(&ctx.transport);
|
||||||
let timeout_secs = ctx.permission_timeout_secs;
|
let timeout_secs = ctx.services.permission_timeout_secs;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
|
||||||
if let Some(tx) = pending.lock().await.remove(&timeout_sender) {
|
if let Some(tx) = pending.lock().await.remove(&timeout_sender) {
|
||||||
@@ -433,7 +433,7 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
|
|||||||
conv.entries.drain(..excess);
|
conv.entries.drain(..excess);
|
||||||
}
|
}
|
||||||
|
|
||||||
save_whatsapp_history(&ctx.project_root, &guard);
|
save_whatsapp_history(&ctx.services.project_root, &guard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -484,22 +484,25 @@ mod tests {
|
|||||||
let agents = Arc::new(AgentPool::new(3999, tx));
|
let agents = Arc::new(AgentPool::new(3999, tx));
|
||||||
let tracker = Arc::new(MessagingWindowTracker::new());
|
let tracker = Arc::new(MessagingWindowTracker::new());
|
||||||
let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
|
let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
Arc::new(WhatsAppWebhookContext {
|
let services = Arc::new(crate::services::Services {
|
||||||
verify_token: "tok".to_string(),
|
|
||||||
provider: "meta".to_string(),
|
|
||||||
transport: Arc::new(NullTransport),
|
|
||||||
project_root: tmp.path().to_path_buf(),
|
project_root: tmp.path().to_path_buf(),
|
||||||
agents,
|
agents,
|
||||||
bot_name: "Bot".to_string(),
|
bot_name: "Bot".to_string(),
|
||||||
bot_user_id: "whatsapp-bot".to_string(),
|
bot_user_id: "whatsapp-bot".to_string(),
|
||||||
ambient_rooms: Arc::new(std::sync::Mutex::new(Default::default())),
|
ambient_rooms: Arc::new(std::sync::Mutex::new(Default::default())),
|
||||||
|
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
|
||||||
|
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())),
|
||||||
|
permission_timeout_secs: 120,
|
||||||
|
});
|
||||||
|
Arc::new(WhatsAppWebhookContext {
|
||||||
|
services,
|
||||||
|
verify_token: "tok".to_string(),
|
||||||
|
provider: "meta".to_string(),
|
||||||
|
transport: Arc::new(NullTransport),
|
||||||
history: Arc::new(tokio::sync::Mutex::new(Default::default())),
|
history: Arc::new(tokio::sync::Mutex::new(Default::default())),
|
||||||
history_size: 20,
|
history_size: 20,
|
||||||
window_tracker: tracker,
|
window_tracker: tracker,
|
||||||
allowed_phones,
|
allowed_phones,
|
||||||
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
|
|
||||||
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())),
|
|
||||||
permission_timeout_secs: 120,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,14 +18,10 @@ pub use meta::WhatsAppTransport;
|
|||||||
pub use twilio::{TwilioWhatsAppTransport, extract_twilio_text_messages};
|
pub use twilio::{TwilioWhatsAppTransport, extract_twilio_text_messages};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::sync::Arc;
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use tokio::sync::{Mutex as TokioMutex, oneshot};
|
|
||||||
|
|
||||||
use crate::agents::AgentPool;
|
|
||||||
use crate::chat::ChatTransport;
|
use crate::chat::ChatTransport;
|
||||||
use crate::http::context::{PermissionDecision, PermissionForward};
|
use crate::services::Services;
|
||||||
use crate::slog;
|
use crate::slog;
|
||||||
use poem::{Request, Response, handler, http::StatusCode, web::Query};
|
use poem::{Request, Response, handler, http::StatusCode, web::Query};
|
||||||
|
|
||||||
@@ -110,16 +106,12 @@ pub struct VerifyQuery {
|
|||||||
|
|
||||||
/// Shared context for webhook handlers, injected via Poem's `Data` extractor.
|
/// Shared context for webhook handlers, injected via Poem's `Data` extractor.
|
||||||
pub struct WhatsAppWebhookContext {
|
pub struct WhatsAppWebhookContext {
|
||||||
|
/// Shared services bundle (project root, agent pool, bot identity, permissions).
|
||||||
|
pub services: Arc<Services>,
|
||||||
pub verify_token: String,
|
pub verify_token: String,
|
||||||
/// Active provider: `"meta"` (Meta Graph API) or `"twilio"` (Twilio REST API).
|
/// Active provider: `"meta"` (Meta Graph API) or `"twilio"` (Twilio REST API).
|
||||||
pub provider: String,
|
pub provider: String,
|
||||||
pub transport: Arc<dyn ChatTransport>,
|
pub transport: Arc<dyn ChatTransport>,
|
||||||
pub project_root: PathBuf,
|
|
||||||
pub agents: Arc<AgentPool>,
|
|
||||||
pub bot_name: String,
|
|
||||||
/// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot").
|
|
||||||
pub bot_user_id: String,
|
|
||||||
pub ambient_rooms: Arc<Mutex<HashSet<String>>>,
|
|
||||||
/// Per-sender conversation history for LLM passthrough.
|
/// Per-sender conversation history for LLM passthrough.
|
||||||
pub history: WhatsAppConversationHistory,
|
pub history: WhatsAppConversationHistory,
|
||||||
/// Maximum number of conversation entries to keep per sender.
|
/// Maximum number of conversation entries to keep per sender.
|
||||||
@@ -129,12 +121,6 @@ pub struct WhatsAppWebhookContext {
|
|||||||
/// Phone numbers allowed to send messages to the bot.
|
/// Phone numbers allowed to send messages to the bot.
|
||||||
/// When empty, all numbers are allowed (backwards compatible).
|
/// When empty, all numbers are allowed (backwards compatible).
|
||||||
pub allowed_phones: Vec<String>,
|
pub allowed_phones: Vec<String>,
|
||||||
/// Permission requests from the MCP `prompt_permission` tool arrive here.
|
|
||||||
pub perm_rx: Arc<TokioMutex<tokio::sync::mpsc::UnboundedReceiver<PermissionForward>>>,
|
|
||||||
/// Pending permission replies keyed by sender phone number.
|
|
||||||
pub pending_perm_replies: Arc<TokioMutex<HashMap<String, oneshot::Sender<PermissionDecision>>>>,
|
|
||||||
/// Seconds before an unanswered permission prompt is auto-denied.
|
|
||||||
pub permission_timeout_secs: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// GET /webhook/whatsapp — webhook verification.
|
/// GET /webhook/whatsapp — webhook verification.
|
||||||
|
|||||||
+12
-20
@@ -564,8 +564,6 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
// Wrap perm_rx in Arc<Mutex> so it can be shared across the Services
|
// Wrap perm_rx in Arc<Mutex> so it can be shared across the Services
|
||||||
// bundle (AppContext + Matrix bot) and the webhook-based transports.
|
// bundle (AppContext + Matrix bot) and the webhook-based transports.
|
||||||
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
|
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
|
||||||
let perm_rx_for_whatsapp = Arc::clone(&perm_rx);
|
|
||||||
|
|
||||||
// Capture project root, agents Arc, and reconciliation sender before ctx
|
// Capture project root, agents Arc, and reconciliation sender before ctx
|
||||||
// is consumed by build_routes.
|
// is consumed by build_routes.
|
||||||
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
||||||
@@ -628,30 +626,17 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
template_name,
|
template_name,
|
||||||
))
|
))
|
||||||
};
|
};
|
||||||
let bot_name = cfg
|
|
||||||
.display_name
|
|
||||||
.clone()
|
|
||||||
.unwrap_or_else(|| "Assistant".to_string());
|
|
||||||
let root = startup_root.clone().unwrap();
|
let root = startup_root.clone().unwrap();
|
||||||
let history = chat::transport::whatsapp::load_whatsapp_history(&root);
|
let history = chat::transport::whatsapp::load_whatsapp_history(&root);
|
||||||
Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext {
|
Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext {
|
||||||
|
services: Arc::clone(&services),
|
||||||
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
|
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
|
||||||
provider,
|
provider,
|
||||||
transport,
|
transport,
|
||||||
project_root: root,
|
|
||||||
agents: Arc::clone(&startup_agents),
|
|
||||||
bot_name,
|
|
||||||
bot_user_id: "whatsapp-bot".to_string(),
|
|
||||||
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
|
|
||||||
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
|
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
|
||||||
history_size: cfg.history_size,
|
history_size: cfg.history_size,
|
||||||
window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()),
|
window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()),
|
||||||
allowed_phones: cfg.whatsapp_allowed_phones.clone(),
|
allowed_phones: cfg.whatsapp_allowed_phones.clone(),
|
||||||
perm_rx: perm_rx_for_whatsapp,
|
|
||||||
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(
|
|
||||||
std::collections::HashMap::new(),
|
|
||||||
)),
|
|
||||||
permission_timeout_secs: cfg.permission_timeout_secs,
|
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -744,7 +729,7 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
// • Matrix: handled by spawn_bot() below; no action needed here.
|
// • Matrix: handled by spawn_bot() below; no action needed here.
|
||||||
if let Some(ref ctx) = whatsapp_ctx {
|
if let Some(ref ctx) = whatsapp_ctx {
|
||||||
let transport = Arc::clone(&ctx.transport);
|
let transport = Arc::clone(&ctx.transport);
|
||||||
let bot_name = ctx.bot_name.clone();
|
let bot_name = ctx.services.bot_name.clone();
|
||||||
let history: WhatsAppConversationHistory = Arc::clone(&ctx.history);
|
let history: WhatsAppConversationHistory = Arc::clone(&ctx.history);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let senders: Vec<String> = history.lock().await.keys().cloned().collect();
|
let senders: Vec<String> = history.lock().await.keys().cloned().collect();
|
||||||
@@ -905,7 +890,7 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
// Spawn stage-transition notification listeners for WhatsApp and Slack.
|
// Spawn stage-transition notification listeners for WhatsApp and Slack.
|
||||||
// These mirror the listener that the Matrix bot spawns internally.
|
// These mirror the listener that the Matrix bot spawns internally.
|
||||||
if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) {
|
if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) {
|
||||||
let ambient_rooms = Arc::clone(&ctx.ambient_rooms);
|
let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms);
|
||||||
crate::service::notifications::spawn_notification_listener(
|
crate::service::notifications::spawn_notification_listener(
|
||||||
Arc::clone(&ctx.transport),
|
Arc::clone(&ctx.transport),
|
||||||
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
|
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
|
||||||
@@ -991,12 +976,19 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
|
|
||||||
// WhatsApp: read the current set of ambient rooms and notify each sender.
|
// WhatsApp: read the current set of ambient rooms and notify each sender.
|
||||||
if let Some(ref ctx) = whatsapp_ctx_for_shutdown {
|
if let Some(ref ctx) = whatsapp_ctx_for_shutdown {
|
||||||
let rooms: Vec<String> = ctx.ambient_rooms.lock().unwrap().iter().cloned().collect();
|
let rooms: Vec<String> = ctx
|
||||||
|
.services
|
||||||
|
.ambient_rooms
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
if !rooms.is_empty() {
|
if !rooms.is_empty() {
|
||||||
let wa_notifier = BotShutdownNotifier::new(
|
let wa_notifier = BotShutdownNotifier::new(
|
||||||
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
|
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
|
||||||
rooms,
|
rooms,
|
||||||
ctx.bot_name.clone(),
|
ctx.services.bot_name.clone(),
|
||||||
);
|
);
|
||||||
wa_notifier.notify(ShutdownReason::Manual).await;
|
wa_notifier.notify(ShutdownReason::Manual).await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user