diff --git a/.huskies/source-map.json b/.huskies/source-map.json index cfb8eab3..c1c40d20 100644 --- a/.huskies/source-map.json +++ b/.huskies/source-map.json @@ -1672,6 +1672,7 @@ "mod story_tools", "mod timer_tools", "mod tools_list", + "mod trigger_tools", "mod wizard_tools", "fn mcp_get_handler", "fn mcp_post_handler" @@ -1769,6 +1770,11 @@ "server/src/http/mcp/tools_list/system_tools.rs": [ "fn system_tools" ], + "server/src/http/mcp/trigger_tools.rs": [ + "fn tool_schedule_event_trigger", + "fn tool_list_event_triggers", + "fn tool_cancel_event_trigger" + ], "server/src/http/mcp/wizard_tools.rs": [ "fn step_output_path", "fn is_script_step", @@ -2347,6 +2353,25 @@ "fn generate_permission_rule", "fn is_dominated_by_wildcard" ], + "server/src/service/event_triggers/mod.rs": [ + "mod store", + "struct TriggerPredicate", + "fn matches", + "enum TriggerAction", + "enum FireMode", + "struct EventTrigger" + ], + "server/src/service/event_triggers/store.rs": [ + "struct EventTriggerStore", + "fn load", + "fn add", + "fn list", + "fn cancel", + "fn cancel_batch", + "fn parse_action", + "fn parse_predicate", + "fn parse_mode" + ], "server/src/service/events/buffer.rs": [ "const MAX_BUFFER_SIZE", "enum StoredEvent", @@ -2500,6 +2525,7 @@ "mod bot_command", "mod common", "mod diagnostics", + "mod event_triggers", "mod events", "mod file_io", "mod gateway", @@ -2922,6 +2948,7 @@ "fn spawn_event_bridges", "fn spawn_tick_loop", "fn spawn_gateway_relay", + "fn spawn_event_trigger_subscriber", "fn spawn_startup_reconciliation" ], "server/src/state.rs": [ diff --git a/server/src/agent_mode/context.rs b/server/src/agent_mode/context.rs index af955beb..b4eda9e4 100644 --- a/server/src/agent_mode/context.rs +++ b/server/src/agent_mode/context.rs @@ -94,5 +94,10 @@ pub(super) fn build_agent_app_context( matrix_shutdown_tx: None, timer_store, scheduled_timer_store, + event_trigger_store: Arc::new( + crate::service::event_triggers::store::EventTriggerStore::load( + project_root.join(".huskies").join("event_triggers.json"), + ), + ), } } diff --git a/server/src/http/context.rs b/server/src/http/context.rs index a5de829c..c9fc2137 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -2,6 +2,7 @@ use crate::agents::ReconciliationEvent; use crate::io::watcher::WatcherEvent; use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; +use crate::service::event_triggers::store::EventTriggerStore; use crate::service::timer::{ScheduledTimerStore, TimerStore}; use crate::services::Services; use crate::state::SessionState; @@ -77,6 +78,11 @@ pub struct AppContext { /// Generic scheduled-timer store for `schedule_timer` / `list_timers` / /// `cancel_timer` MCP tools. Persists to `.huskies/scheduled_timers.json`. pub scheduled_timer_store: Arc, + /// Persistent store for event-based pipeline triggers. + /// + /// Shared with the background subscriber so that triggers registered via + /// MCP are immediately visible to the subscriber without a disk round-trip. + pub event_trigger_store: Arc, } #[cfg(test)] @@ -93,6 +99,9 @@ impl AppContext { let timer_store = Arc::new(TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let event_trigger_store = Arc::new(EventTriggerStore::load( + project_root.join(".huskies").join("event_triggers.json"), + )); let scheduled_timer_store = Arc::new(ScheduledTimerStore::load( project_root.join(".huskies").join("scheduled_timers.json"), )); @@ -123,6 +132,7 @@ impl AppContext { matrix_shutdown_tx: None, timer_store, scheduled_timer_store, + event_trigger_store, } } } diff --git a/server/src/http/mcp/dispatch.rs b/server/src/http/mcp/dispatch.rs index d924c763..0346b0c3 100644 --- a/server/src/http/mcp/dispatch.rs +++ b/server/src/http/mcp/dispatch.rs @@ -4,7 +4,7 @@ use serde_json::Value; use super::{ agent_tools, diagnostics, git_tools, merge_tools, qa_tools, shell_tools, status_tools, - story_tools, timer_tools, wizard_tools, + story_tools, timer_tools, trigger_tools, wizard_tools, }; use crate::http::context::AppContext; @@ -128,6 +128,10 @@ pub async fn dispatch_tool_call( "wizard_confirm" => wizard_tools::tool_wizard_confirm(ctx), "wizard_skip" => wizard_tools::tool_wizard_skip(ctx), "wizard_retry" => wizard_tools::tool_wizard_retry(ctx), + // Event trigger tools + "schedule_event_trigger" => trigger_tools::tool_schedule_event_trigger(&args, ctx), + "list_event_triggers" => trigger_tools::tool_list_event_triggers(ctx), + "cancel_event_trigger" => trigger_tools::tool_cancel_event_trigger(&args, ctx), // Scheduled timer tools "schedule_timer" => timer_tools::tool_schedule_timer(&args, ctx), "list_timers" => timer_tools::tool_list_timers(ctx), diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index cd352dee..9f30e93e 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -29,6 +29,8 @@ pub mod story_tools; pub mod timer_tools; /// MCP tool schema definitions for `tools/list`. pub mod tools_list; +/// MCP tools for event-based pipeline triggers. +pub mod trigger_tools; /// MCP tools for the project setup wizard. pub mod wizard_tools; diff --git a/server/src/http/mcp/tools_list/mod.rs b/server/src/http/mcp/tools_list/mod.rs index 38020c27..4423fed6 100644 --- a/server/src/http/mcp/tools_list/mod.rs +++ b/server/src/http/mcp/tools_list/mod.rs @@ -108,10 +108,13 @@ mod tests { assert!(names.contains(&"unfreeze_story")); assert!(names.contains(&"find_orphaned_items")); assert!(names.contains(&"recover_half_written_items")); + assert!(names.contains(&"schedule_event_trigger")); + assert!(names.contains(&"list_event_triggers")); + assert!(names.contains(&"cancel_event_trigger")); assert!(names.contains(&"schedule_timer")); assert!(names.contains(&"list_timers")); assert!(names.contains(&"cancel_timer")); - assert_eq!(tools.len(), 79); + assert_eq!(tools.len(), 82); } #[test] diff --git a/server/src/http/mcp/tools_list/system_tools.rs b/server/src/http/mcp/tools_list/system_tools.rs index b4d864d6..e2638872 100644 --- a/server/src/http/mcp/tools_list/system_tools.rs +++ b/server/src/http/mcp/tools_list/system_tools.rs @@ -349,6 +349,64 @@ pub(super) fn system_tools() -> Vec { "properties": {} } }), + json!({ + "name": "schedule_event_trigger", + "description": "Register an event-based pipeline trigger that fires when a TransitionFired event matches the given predicate. Persists across server restarts. Returns the trigger id.", + "inputSchema": { + "type": "object", + "properties": { + "predicate": { + "type": "object", + "description": "Conditions that must all match for the trigger to fire. Omit a field to match any value (wildcard).", + "properties": { + "story_id": { "type": "string", "description": "Match only transitions for this story id (e.g. '42_my_feature')." }, + "from_stage": { "type": "string", "description": "Match only when the stage before the transition equals this label (e.g. 'Merge', 'Coding')." }, + "to_stage": { "type": "string", "description": "Match only when the stage after the transition equals this label (e.g. 'Done', 'MergeFailure')." }, + "event_kind": { "type": "string", "description": "Match only when the PipelineEvent kind equals this label (e.g. 'MergeFailed', 'Block', 'MergeSucceeded')." } + } + }, + "action": { + "type": "object", + "description": "What to do when the trigger fires.", + "properties": { + "type": { "type": "string", "enum": ["mcp", "prompt"], "description": "\"mcp\": call an MCP tool (no LLM). \"prompt\": spawn a focused agent with the text as its task." }, + "method": { "type": "string", "description": "For type=mcp: the MCP tool name to call (e.g. 'get_pipeline_status')." }, + "args": { "type": "object", "description": "For type=mcp: arguments to pass to the tool." }, + "text": { "type": "string", "description": "For type=prompt: the task text for the spawned agent." } + }, + "required": ["type"] + }, + "mode": { + "type": "string", + "enum": ["once", "persistent"], + "description": "\"once\" (default): remove the trigger after it fires once. \"persistent\": keep it active until cancel_event_trigger is called." + } + }, + "required": ["predicate", "action"] + } + }), + json!({ + "name": "list_event_triggers", + "description": "Return all currently registered event triggers with their ids, predicates, actions, and fire modes.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), + json!({ + "name": "cancel_event_trigger", + "description": "Cancel and remove a registered event trigger by its id.", + "inputSchema": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "The trigger id returned by schedule_event_trigger." + } + }, + "required": ["id"] + } + }), json!({ "name": "find_orphaned_items", "description": "Find half-written (orphaned) pipeline items: story IDs that exist in the content store but have no live CRDT entry. These are invisible to all normal read paths (list_refactors, get_pipeline_status, etc.) and result from the bug 1001 split-brain race. Returns a list of orphaned IDs with their names and tombstone status. Use recover_half_written_items to fix them.", diff --git a/server/src/http/mcp/trigger_tools.rs b/server/src/http/mcp/trigger_tools.rs new file mode 100644 index 00000000..7be13aa0 --- /dev/null +++ b/server/src/http/mcp/trigger_tools.rs @@ -0,0 +1,149 @@ +//! MCP tools for event-based pipeline triggers: +//! `schedule_event_trigger`, `list_event_triggers`, `cancel_event_trigger`. + +use crate::http::context::AppContext; +use crate::service::event_triggers::store::{parse_action, parse_mode, parse_predicate}; +use serde_json::{Value, json}; + +/// Register a new event trigger that fires when a `TransitionFired` event matches the predicate. +pub(crate) fn tool_schedule_event_trigger( + args: &Value, + ctx: &AppContext, +) -> Result { + let predicate = parse_predicate(args)?; + let action = parse_action(args)?; + let mode = parse_mode(args); + + let trigger = ctx.event_trigger_store.add(predicate, action, mode)?; + + serde_json::to_string_pretty(&json!({ + "id": trigger.id, + "mode": format!("{:?}", trigger.mode).to_lowercase(), + "created_at": trigger.created_at.to_rfc3339(), + "message": format!("Trigger {} registered.", trigger.id), + })) + .map_err(|e| format!("Serialization error: {e}")) +} + +/// List all currently registered event triggers. +pub(crate) fn tool_list_event_triggers(ctx: &AppContext) -> Result { + let triggers = ctx.event_trigger_store.list(); + let items: Vec = triggers + .iter() + .map(|t| { + json!({ + "id": t.id, + "mode": format!("{:?}", t.mode).to_lowercase(), + "created_at": t.created_at.to_rfc3339(), + "predicate": { + "story_id": t.predicate.story_id, + "from_stage": t.predicate.from_stage, + "to_stage": t.predicate.to_stage, + "event_kind": t.predicate.event_kind, + }, + "action": serde_json::to_value(&t.action).unwrap_or(json!(null)), + }) + }) + .collect(); + + serde_json::to_string_pretty(&json!({ "triggers": items, "count": items.len() })) + .map_err(|e| format!("Serialization error: {e}")) +} + +/// Cancel (remove) a registered event trigger by its ID. +pub(crate) fn tool_cancel_event_trigger(args: &Value, ctx: &AppContext) -> Result { + let id = args + .get("id") + .and_then(|v| v.as_str()) + .ok_or("Missing required argument: id")?; + + if ctx.event_trigger_store.cancel(id) { + Ok(format!("Trigger {id} cancelled.")) + } else { + Err(format!("No trigger found with id '{id}'.")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::test_helpers::test_ctx; + + #[test] + fn schedule_and_list() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + + let result = tool_schedule_event_trigger( + &json!({ + "predicate": { "to_stage": "Done" }, + "action": { "type": "mcp", "method": "get_pipeline_status", "args": {} }, + "mode": "once" + }), + &ctx, + ) + .unwrap(); + let parsed: Value = serde_json::from_str(&result).unwrap(); + let id = parsed["id"].as_str().unwrap(); + assert!(!id.is_empty()); + + let list_result = tool_list_event_triggers(&ctx).unwrap(); + let list: Value = serde_json::from_str(&list_result).unwrap(); + assert_eq!(list["count"], 1); + } + + #[test] + fn cancel_existing_trigger() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + + let result = tool_schedule_event_trigger( + &json!({ + "predicate": {}, + "action": { "type": "prompt", "text": "investigate" }, + "mode": "persistent" + }), + &ctx, + ) + .unwrap(); + let parsed: Value = serde_json::from_str(&result).unwrap(); + let id = parsed["id"].as_str().unwrap().to_string(); + + let cancel = tool_cancel_event_trigger(&json!({ "id": id }), &ctx).unwrap(); + assert!(cancel.contains("cancelled")); + + let list: Value = serde_json::from_str(&tool_list_event_triggers(&ctx).unwrap()).unwrap(); + assert_eq!(list["count"], 0); + } + + #[test] + fn cancel_missing_trigger_errors() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + let result = tool_cancel_event_trigger(&json!({ "id": "nonexistent-id" }), &ctx); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("No trigger found")); + } + + #[test] + fn schedule_missing_predicate_errors() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + let result = tool_schedule_event_trigger( + &json!({ "action": { "type": "mcp", "method": "get_version", "args": {} } }), + &ctx, + ); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("predicate")); + } + + #[test] + fn schedule_missing_action_errors() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + let result = + tool_schedule_event_trigger(&json!({ "predicate": { "to_stage": "Done" } }), &ctx); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("action")); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 30963ee6..71f80d1c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -283,6 +283,17 @@ async fn main() -> Result<(), std::io::Error> { let timer_store_for_tick = Arc::clone(&timer_store); let timer_store_for_bot = Arc::clone(&timer_store); + // Event-based pipeline trigger store. + let event_trigger_store = std::sync::Arc::new( + crate::service::event_triggers::store::EventTriggerStore::load( + startup_root + .as_ref() + .map(|r| r.join(".huskies").join("event_triggers.json")) + .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-event-triggers.json")), + ), + ); + let event_trigger_store_for_subscriber = Arc::clone(&event_trigger_store); + // Generic scheduled-timer store for the `schedule_timer` MCP tool. let scheduled_timer_store = std::sync::Arc::new(crate::service::timer::ScheduledTimerStore::load( @@ -306,6 +317,7 @@ async fn main() -> Result<(), std::io::Error> { matrix_shutdown_tx: Some(Arc::clone(&bot_ctxs.matrix_shutdown_tx)), timer_store, scheduled_timer_store, + event_trigger_store, }; // Per-project event buffer for the gateway's `/api/events` poller. @@ -315,6 +327,9 @@ async fn main() -> Result<(), std::io::Error> { // Gateway relay task (pushes StatusEvents to a configured gateway). startup::tick_loop::spawn_gateway_relay(&startup_root, Arc::clone(&services.status)); + // Clone ctx before it is consumed by build_routes; AppContext is cheap to clone (all Arcs). + let ctx_for_triggers = ctx.clone(); + let app = build_routes( ctx.clone(), bot_ctxs.whatsapp_ctx.clone(), @@ -358,6 +373,14 @@ async fn main() -> Result<(), std::io::Error> { watcher_rx_for_discord, ); + // Event trigger subscriber: listens on TransitionFired and executes matching triggers. + startup::tick_loop::spawn_event_trigger_subscriber( + event_trigger_store_for_subscriber, + Arc::clone(&startup_agents), + startup_root.clone(), + ctx_for_triggers, + ); + // Reconcile completed worktrees and auto-assign free agents. startup::tick_loop::spawn_startup_reconciliation( startup_root.clone(), diff --git a/server/src/service/event_triggers/mod.rs b/server/src/service/event_triggers/mod.rs new file mode 100644 index 00000000..867504b7 --- /dev/null +++ b/server/src/service/event_triggers/mod.rs @@ -0,0 +1,174 @@ +//! Event-based pipeline triggers: register conditions on [`TransitionFired`] events +//! and automatically execute MCP tool calls or agent prompts when they fire. + +/// Persistent storage for event triggers backed by a JSON file. +pub mod store; + +use crate::pipeline_state::{TransitionFired, event_label, stage_label}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +/// Predicate for matching a [`TransitionFired`] event. +/// +/// Every `Some` field must match for the predicate to pass; `None` fields +/// are wildcards. All comparisons are case-insensitive. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TriggerPredicate { + /// Match only transitions for this story (e.g. `"42_my_feature"`). + pub story_id: Option, + /// Match only when `fired.before` has this label (e.g. `"Merge"`, `"Coding"`). + pub from_stage: Option, + /// Match only when `fired.after` has this label (e.g. `"Done"`, `"MergeFailure"`). + pub to_stage: Option, + /// Match only when the event has this label (e.g. `"MergeFailed"`, `"Block"`). + pub event_kind: Option, +} + +impl TriggerPredicate { + /// Returns `true` if every non-`None` field matches `fired`. + pub fn matches(&self, fired: &TransitionFired) -> bool { + if let Some(sid) = &self.story_id + && !fired.story_id.0.eq_ignore_ascii_case(sid) + { + return false; + } + if let Some(from) = &self.from_stage + && !stage_label(&fired.before).eq_ignore_ascii_case(from) + { + return false; + } + if let Some(to) = &self.to_stage + && !stage_label(&fired.after).eq_ignore_ascii_case(to) + { + return false; + } + if let Some(kind) = &self.event_kind + && !event_label(&fired.event).eq_ignore_ascii_case(kind) + { + return false; + } + true + } +} + +/// Action to execute when a trigger fires. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TriggerAction { + /// Call an MCP tool deterministically (no LLM in the loop). + Mcp { + method: String, + args: serde_json::Value, + }, + /// Spawn a short-lived focused agent with this text as its task prompt. + Prompt { text: String }, +} + +/// Whether a trigger fires once then is removed, or fires repeatedly until cancelled. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FireMode { + /// Remove the trigger after its first match. + Once, + /// Keep the trigger active until explicitly cancelled. + Persistent, +} + +/// A registered event trigger. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventTrigger { + /// Unique identifier (UUIDv4). + pub id: String, + /// Predicate applied to every [`TransitionFired`]. + pub predicate: TriggerPredicate, + /// Action to execute on match. + pub action: TriggerAction, + /// Whether this trigger fires once or persists. + pub mode: FireMode, + /// When the trigger was registered. + pub created_at: DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipeline_state::{PipelineEvent, Stage, StoryId, TransitionFired}; + + fn fired(story: &str, before: Stage, after: Stage, event: PipelineEvent) -> TransitionFired { + TransitionFired { + story_id: StoryId(story.to_string()), + before, + after, + event, + at: Utc::now(), + } + } + + #[test] + fn predicate_wildcard_matches_anything() { + let p = TriggerPredicate { + story_id: None, + from_stage: None, + to_stage: None, + event_kind: None, + }; + let f = fired("42_foo", Stage::Backlog, Stage::Qa, PipelineEvent::DepsMet); + assert!(p.matches(&f)); + } + + #[test] + fn predicate_story_id_filter() { + let p = TriggerPredicate { + story_id: Some("42_foo".to_string()), + from_stage: None, + to_stage: None, + event_kind: None, + }; + let yes = fired("42_foo", Stage::Backlog, Stage::Qa, PipelineEvent::DepsMet); + let no = fired("99_bar", Stage::Backlog, Stage::Qa, PipelineEvent::DepsMet); + assert!(p.matches(&yes)); + assert!(!p.matches(&no)); + } + + #[test] + fn predicate_to_stage_filter() { + let p = TriggerPredicate { + story_id: None, + from_stage: None, + to_stage: Some("Done".to_string()), + event_kind: None, + }; + let yes = fired( + "1", + Stage::Merge { + feature_branch: crate::pipeline_state::BranchName("b".into()), + commits_ahead: std::num::NonZeroU32::new(1).unwrap(), + claim: None, + retries: 0, + server_start_time: None, + }, + Stage::Done { + merged_at: Utc::now(), + merge_commit: crate::pipeline_state::GitSha("abc".into()), + }, + PipelineEvent::MergeSucceeded { + merge_commit: crate::pipeline_state::GitSha("abc".into()), + }, + ); + let no = fired("1", Stage::Backlog, Stage::Qa, PipelineEvent::DepsMet); + assert!(p.matches(&yes)); + assert!(!p.matches(&no)); + } + + #[test] + fn predicate_event_kind_case_insensitive() { + let p = TriggerPredicate { + story_id: None, + from_stage: None, + to_stage: None, + event_kind: Some("depsmet".to_string()), + }; + let f = fired("1", Stage::Backlog, Stage::Qa, PipelineEvent::DepsMet); + assert!(p.matches(&f)); + } +} diff --git a/server/src/service/event_triggers/store.rs b/server/src/service/event_triggers/store.rs new file mode 100644 index 00000000..165c545f --- /dev/null +++ b/server/src/service/event_triggers/store.rs @@ -0,0 +1,332 @@ +//! Persistent store for registered event triggers, backed by a JSON file. +//! +//! Loaded at server startup and kept in sync on every mutation. Thread-safe +//! via an internal `Mutex`. + +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use chrono::Utc; +use serde_json::Value; + +use super::{EventTrigger, FireMode, TriggerAction, TriggerPredicate}; + +/// Persistent store for [`EventTrigger`] entries. +pub struct EventTriggerStore { + path: PathBuf, + triggers: Mutex>, +} + +impl EventTriggerStore { + /// Load the store from `path`. Returns an empty store if the file does + /// not exist or cannot be parsed. + pub fn load(path: PathBuf) -> Self { + let triggers = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::>(&s).ok()) + .unwrap_or_default() + } else { + Vec::new() + }; + Self { + path, + triggers: Mutex::new(triggers), + } + } + + fn persist(path: &Path, triggers: &[EventTrigger]) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("Failed to create directory: {e}"))?; + } + let json = serde_json::to_string_pretty(triggers) + .map_err(|e| format!("Serialization error: {e}"))?; + std::fs::write(path, json).map_err(|e| format!("Failed to write triggers: {e}")) + } + + /// Register a new trigger and persist to disk. + pub fn add( + &self, + predicate: TriggerPredicate, + action: TriggerAction, + mode: FireMode, + ) -> Result { + let trigger = EventTrigger { + id: uuid_v4(), + predicate, + action, + mode, + created_at: Utc::now(), + }; + let mut triggers = self.triggers.lock().unwrap(); + triggers.push(trigger.clone()); + Self::persist(&self.path, &triggers)?; + Ok(trigger) + } + + /// Return a snapshot of all registered triggers. + pub fn list(&self) -> Vec { + self.triggers.lock().unwrap().clone() + } + + /// Remove the trigger with `id`. Returns `true` if it was found and removed. + pub fn cancel(&self, id: &str) -> bool { + let mut triggers = self.triggers.lock().unwrap(); + let before = triggers.len(); + triggers.retain(|t| t.id != id); + let removed = triggers.len() < before; + if removed { + let _ = Self::persist(&self.path, &triggers); + } + removed + } + + /// Remove all triggers whose ids are in `ids` and return how many were removed. + /// + /// Used by the subscriber to delete `Once` triggers after they fire. + pub fn cancel_batch(&self, ids: &[String]) -> usize { + if ids.is_empty() { + return 0; + } + let mut triggers = self.triggers.lock().unwrap(); + let before = triggers.len(); + triggers.retain(|t| !ids.contains(&t.id)); + let removed = before - triggers.len(); + if removed > 0 { + let _ = Self::persist(&self.path, &triggers); + } + removed + } +} + +/// Generate a random UUIDv4-style identifier without pulling in the full uuid crate. +/// +/// Uses [`std::time`] entropy mixed with a thread-local counter. Not cryptographically +/// strong, but unique enough for trigger IDs. +fn uuid_v4() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .subsec_nanos(); + let hi = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + format!("{hi:016x}-{nanos:08x}-4000-0000-{hi:012x}") +} + +/// Parse a [`TriggerAction`] from the raw JSON supplied in an MCP args object. +pub fn parse_action(args: &Value) -> Result { + let action_obj = args + .get("action") + .ok_or("Missing required argument: action")?; + let action_type = action_obj + .get("type") + .and_then(|v| v.as_str()) + .ok_or("action.type must be a string (\"mcp\" or \"prompt\")")?; + + match action_type { + "mcp" => { + let method = action_obj + .get("method") + .and_then(|v| v.as_str()) + .ok_or("action.method is required for type=mcp")? + .to_string(); + let action_args = action_obj + .get("args") + .cloned() + .unwrap_or(serde_json::Value::Object(Default::default())); + Ok(TriggerAction::Mcp { + method, + args: action_args, + }) + } + "prompt" => { + let text = action_obj + .get("text") + .and_then(|v| v.as_str()) + .ok_or("action.text is required for type=prompt")? + .to_string(); + Ok(TriggerAction::Prompt { text }) + } + other => Err(format!( + "Unknown action type '{other}'; expected \"mcp\" or \"prompt\"" + )), + } +} + +/// Parse a [`TriggerPredicate`] from the raw JSON supplied in an MCP args object. +pub fn parse_predicate(args: &Value) -> Result { + let pred = args + .get("predicate") + .ok_or("Missing required argument: predicate")?; + Ok(TriggerPredicate { + story_id: pred + .get("story_id") + .and_then(|v| v.as_str()) + .map(str::to_string), + from_stage: pred + .get("from_stage") + .and_then(|v| v.as_str()) + .map(str::to_string), + to_stage: pred + .get("to_stage") + .and_then(|v| v.as_str()) + .map(str::to_string), + event_kind: pred + .get("event_kind") + .and_then(|v| v.as_str()) + .map(str::to_string), + }) +} + +/// Parse a [`FireMode`] from the raw JSON supplied in an MCP args object. +pub fn parse_mode(args: &Value) -> FireMode { + match args.get("mode").and_then(|v| v.as_str()) { + Some("persistent") => FireMode::Persistent, + _ => FireMode::Once, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn tmp_store() -> (TempDir, EventTriggerStore) { + let dir = TempDir::new().unwrap(); + let store = EventTriggerStore::load(dir.path().join("triggers.json")); + (dir, store) + } + + fn basic_pred() -> TriggerPredicate { + TriggerPredicate { + story_id: None, + from_stage: None, + to_stage: Some("Done".to_string()), + event_kind: None, + } + } + + fn basic_action() -> TriggerAction { + TriggerAction::Mcp { + method: "get_pipeline_status".to_string(), + args: serde_json::json!({}), + } + } + + #[test] + fn store_empty_on_missing_file() { + let (_dir, store) = tmp_store(); + assert!(store.list().is_empty()); + } + + #[test] + fn store_add_and_list() { + let (_dir, store) = tmp_store(); + let t = store + .add(basic_pred(), basic_action(), FireMode::Once) + .unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].id, t.id); + assert!(matches!(list[0].mode, FireMode::Once)); + } + + #[test] + fn store_cancel_removes_entry() { + let (_dir, store) = tmp_store(); + let t = store + .add(basic_pred(), basic_action(), FireMode::Persistent) + .unwrap(); + assert!(store.cancel(&t.id)); + assert!(!store.cancel(&t.id)); + assert!(store.list().is_empty()); + } + + #[test] + fn store_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("triggers.json"); + let id = { + let store = EventTriggerStore::load(path.clone()); + let t = store + .add(basic_pred(), basic_action(), FireMode::Persistent) + .unwrap(); + t.id.clone() + }; + let store2 = EventTriggerStore::load(path); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].id, id); + } + + #[test] + fn store_cancel_batch() { + let (_dir, store) = tmp_store(); + let a = store + .add(basic_pred(), basic_action(), FireMode::Once) + .unwrap(); + let b = store + .add(basic_pred(), basic_action(), FireMode::Once) + .unwrap(); + let removed = store.cancel_batch(&[a.id, b.id]); + assert_eq!(removed, 2); + assert!(store.list().is_empty()); + } + + #[test] + fn parse_action_mcp() { + let args = serde_json::json!({ + "action": { "type": "mcp", "method": "get_version", "args": {} } + }); + let action = parse_action(&args).unwrap(); + assert!(matches!(action, TriggerAction::Mcp { .. })); + } + + #[test] + fn parse_action_prompt() { + let args = serde_json::json!({ + "action": { "type": "prompt", "text": "investigate the merge failure" } + }); + let action = parse_action(&args).unwrap(); + assert!(matches!(action, TriggerAction::Prompt { .. })); + } + + #[test] + fn parse_action_unknown_type_errors() { + let args = serde_json::json!({ "action": { "type": "webhook" } }); + assert!(parse_action(&args).is_err()); + } + + #[test] + fn parse_predicate_all_fields() { + let args = serde_json::json!({ + "predicate": { + "story_id": "42_foo", + "from_stage": "Coding", + "to_stage": "Done", + "event_kind": "MergeSucceeded" + } + }); + let pred = parse_predicate(&args).unwrap(); + assert_eq!(pred.story_id.as_deref(), Some("42_foo")); + assert_eq!(pred.from_stage.as_deref(), Some("Coding")); + assert_eq!(pred.to_stage.as_deref(), Some("Done")); + assert_eq!(pred.event_kind.as_deref(), Some("MergeSucceeded")); + } + + #[test] + fn parse_mode_defaults_to_once() { + let args = serde_json::json!({}); + assert_eq!(parse_mode(&args), FireMode::Once); + } + + #[test] + fn parse_mode_persistent() { + let args = serde_json::json!({ "mode": "persistent" }); + assert_eq!(parse_mode(&args), FireMode::Persistent); + } +} diff --git a/server/src/service/mod.rs b/server/src/service/mod.rs index 0fd3dafd..657ae4ba 100644 --- a/server/src/service/mod.rs +++ b/server/src/service/mod.rs @@ -15,6 +15,8 @@ pub mod bot_command; pub mod common; /// Diagnostics — server logs, CRDT dump, and permission management. pub mod diagnostics; +/// Event-based pipeline triggers: register, list, cancel, and execute on TransitionFired events. +pub mod event_triggers; /// Pipeline event buffer for SSE streaming. pub mod events; /// File I/O — path validation, read, write, and listing. diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 8a24d3b6..97248296 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -8,6 +8,8 @@ use crate::http::context::AppContext; use crate::http::mcp::dispatch::dispatch_tool_call; use crate::io; use crate::service; +use crate::service::event_triggers::store::EventTriggerStore; +use crate::service::event_triggers::{FireMode, TriggerAction}; use crate::service::status::StatusBroadcaster; use crate::service::timer::scheduled::{ ScheduledTimer, ScheduledTimerStore, TimerAction, TimerMode, @@ -304,6 +306,153 @@ pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc, + agents: Arc, + project_root: Option, + ctx: AppContext, +) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + let fired = match rx.recv().await { + Ok(f) => f, + Err(broadcast::error::RecvError::Lagged(n)) => { + crate::slog!( + "[event-triggers] Lagged {n} transition events; some triggers may have been skipped" + ); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + crate::slog!("[event-triggers] Transition channel closed; subscriber stopping"); + break; + } + }; + + let triggers = store.list(); + if triggers.is_empty() { + continue; + } + + let mut to_cancel: Vec = Vec::new(); + + for trigger in &triggers { + if !trigger.predicate.matches(&fired) { + continue; + } + + crate::slog!( + "[event-triggers] Trigger {} matched: story={} {}→{}", + trigger.id, + fired.story_id.0, + crate::pipeline_state::stage_label(&fired.before), + crate::pipeline_state::stage_label(&fired.after), + ); + + match &trigger.action { + TriggerAction::Mcp { method, args } => { + execute_mcp_action(method, args.clone(), &ctx).await; + } + TriggerAction::Prompt { text } => { + execute_prompt_action( + text, + &fired.story_id.0, + &agents, + project_root.as_deref(), + ) + .await; + } + } + + if trigger.mode == FireMode::Once { + to_cancel.push(trigger.id.clone()); + } + } + + if !to_cancel.is_empty() { + store.cancel_batch(&to_cancel); + } + } + }); +} + +/// Execute an Mcp action by dispatching through the full MCP tool dispatch path. +async fn execute_mcp_action(method: &str, args: serde_json::Value, ctx: &AppContext) { + match crate::http::mcp::dispatch::dispatch_tool_call(method, args, ctx).await { + Ok(result) => { + crate::slog!("[event-triggers] Mcp '{method}' succeeded: {result}"); + } + Err(e) => { + crate::slog!("[event-triggers] Mcp '{method}' failed: {e}"); + } + } +} + +/// Execute a Prompt action: create an ephemeral story and start an agent on it. +async fn execute_prompt_action( + text: &str, + triggering_story_id: &str, + agents: &Arc, + project_root: Option<&std::path::Path>, +) { + let Some(root) = project_root else { + crate::slog!("[event-triggers] Prompt action skipped (no project root configured): {text}"); + return; + }; + + // Allocate a new story ID for the ephemeral agent task. + let num = crate::db::ops::next_item_number(); + let story_id = format!("{num}_trigger_task"); + + let content = format!( + "---\nname: Trigger Task\n---\n\ + # Trigger Task\n\n\ + _Auto-created by event trigger (source story: {triggering_story_id})_\n\n\ + ## Task\n\n\ + {text}\n\n\ + ## Acceptance Criteria\n\n\ + - [ ] Complete the task described above and exit.\n" + ); + + crate::db::write_item_with_content( + &story_id, + "1_backlog", + &content, + crate::db::ItemMeta { + name: Some("Trigger Task".to_string()), + ..Default::default() + }, + ); + + if let Err(e) = crate::agents::lifecycle::move_story_to_current(&story_id) { + crate::slog!("[event-triggers] Failed to move {story_id} to current: {e}"); + return; + } + + match agents.start_agent(root, &story_id, None, None, None).await { + Ok(info) => { + crate::slog!( + "[event-triggers] Started agent {} for prompt task {story_id}", + info.agent_name + ); + } + Err(e) => { + crate::slog!("[event-triggers] Failed to start agent for prompt task {story_id}: {e}"); + } + } +} + /// Spawn the startup reconstruction task: replay the current pipeline state /// through the [`TransitionFired`][crate::pipeline_state::TransitionFired] /// broadcast channel so that all existing subscribers (worktree lifecycle,