Compare commits
4 Commits
8f666bd6b3
...
556d335997
| Author | SHA1 | Date | |
|---|---|---|---|
| 556d335997 | |||
| c66016394b | |||
| 23c3301903 | |||
| e6865a1bc6 |
@@ -282,6 +282,9 @@
|
||||
"function parseCodeRefs",
|
||||
"function InlineCodeWithRefs"
|
||||
],
|
||||
"frontend/src/components/ErrorBoundary.tsx": [
|
||||
"class ErrorBoundary"
|
||||
],
|
||||
"frontend/src/components/GatewayPanel.test.tsx": [],
|
||||
"frontend/src/components/GatewayPanel.tsx": [
|
||||
"function StoryRow",
|
||||
@@ -386,6 +389,7 @@
|
||||
"function useChatSend"
|
||||
],
|
||||
"frontend/src/hooks/useChatWebSocket.ts": [
|
||||
"type WsConnectivity",
|
||||
"interface UseChatWebSocketResult",
|
||||
"function useChatWebSocket"
|
||||
],
|
||||
@@ -490,6 +494,7 @@
|
||||
"fn supersede_story",
|
||||
"fn reject_story",
|
||||
"fn move_story_to_stage",
|
||||
"fn transition_merge_failure_to_retry",
|
||||
"fn close_bug_to_archive"
|
||||
],
|
||||
"server/src/agents/local_prompt.rs": [
|
||||
@@ -942,6 +947,10 @@
|
||||
"fn handle_assign"
|
||||
],
|
||||
"server/src/chat/transport/matrix/bot/context.rs": [
|
||||
"const SEEN_EVENT_IDS_CAP",
|
||||
"struct SeenEventIds",
|
||||
"fn new",
|
||||
"fn insert",
|
||||
"struct BotContext",
|
||||
"fn effective_project_root",
|
||||
"fn is_gateway",
|
||||
@@ -971,7 +980,8 @@
|
||||
"fn handle_message"
|
||||
],
|
||||
"server/src/chat/transport/matrix/bot/messages/mod.rs": [
|
||||
"fn format_user_prompt"
|
||||
"fn format_user_prompt",
|
||||
"fn format_drained_events"
|
||||
],
|
||||
"server/src/chat/transport/matrix/bot/messages/on_room_message.rs": [
|
||||
"fn on_room_message"
|
||||
@@ -1418,7 +1428,8 @@
|
||||
"fn migrate_names_from_slugs",
|
||||
"fn migrate_legacy_stage_strings",
|
||||
"fn migrate_node_claims_to_agent_claims",
|
||||
"fn migrate_merge_job"
|
||||
"fn migrate_merge_job",
|
||||
"fn purge_done_stage_merge_jobs"
|
||||
],
|
||||
"server/src/crdt_state/write/mod.rs": [],
|
||||
"server/src/crdt_state/write/tests.rs": [],
|
||||
@@ -1537,6 +1548,7 @@
|
||||
"fn recover_half_written_items"
|
||||
],
|
||||
"server/src/db/shadow_write.rs": [
|
||||
"fn get_shared_pool",
|
||||
"struct PipelineWriteMsg",
|
||||
"struct PipelineDb",
|
||||
"static PIPELINE_DB",
|
||||
@@ -2363,6 +2375,7 @@
|
||||
],
|
||||
"server/src/service/event_triggers/store.rs": [
|
||||
"struct EventTriggerStore",
|
||||
"fn from_pool",
|
||||
"fn load",
|
||||
"fn add",
|
||||
"fn list",
|
||||
@@ -2488,7 +2501,8 @@
|
||||
"fn save_bot_config_and_restart"
|
||||
],
|
||||
"server/src/service/gateway/polling.rs": [
|
||||
"fn format_gateway_event"
|
||||
"fn format_gateway_event",
|
||||
"fn format_gateway_audit_line"
|
||||
],
|
||||
"server/src/service/git_ops/io.rs": [
|
||||
"fn validate_worktree_path",
|
||||
@@ -2774,6 +2788,7 @@
|
||||
],
|
||||
"server/src/service/timer/io.rs": [
|
||||
"struct TimerStore",
|
||||
"fn from_pool",
|
||||
"fn load",
|
||||
"fn add",
|
||||
"fn remove",
|
||||
@@ -2815,6 +2830,7 @@
|
||||
"struct ScheduledTimer",
|
||||
"fn new_id",
|
||||
"struct ScheduledTimerStore",
|
||||
"fn from_pool",
|
||||
"fn load",
|
||||
"fn add",
|
||||
"fn remove_by_id",
|
||||
|
||||
@@ -60,13 +60,6 @@ pub fn read_project_local_prompt(project_root: &Path) -> Option<String> {
|
||||
sections.push((rel_path, trimmed.to_string()));
|
||||
}
|
||||
|
||||
// Regenerate the source map so agents always start from a fresh snapshot.
|
||||
// Failure is non-fatal: log it and fall through to whatever is on disk.
|
||||
let map_path = project_root.join(SOURCE_MAP_REL);
|
||||
if let Err(e) = source_map_gen::regenerate_source_map(project_root, &map_path) {
|
||||
crate::slog!("[agents] source-map regen failed (non-fatal): {}", e);
|
||||
}
|
||||
|
||||
// Read source-map.json (after AGENT.md) with a byte cap.
|
||||
let source_map_content = read_source_map_section(project_root);
|
||||
|
||||
@@ -394,86 +387,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// ── Regen-on-spawn tests ─────────────────────────────────────────────────
|
||||
|
||||
fn init_git_repo(dir: &Path) {
|
||||
let run = |args: &[&str]| {
|
||||
std::process::Command::new("git")
|
||||
.args(args)
|
||||
.current_dir(dir)
|
||||
.output()
|
||||
.unwrap();
|
||||
};
|
||||
run(&["init"]);
|
||||
run(&["config", "user.email", "test@test.com"]);
|
||||
run(&["config", "user.name", "Test"]);
|
||||
run(&["commit", "--allow-empty", "-m", "init"]);
|
||||
}
|
||||
|
||||
/// Happy path: regen runs successfully and the fresh map is included in the bundle.
|
||||
#[test]
|
||||
fn regen_creates_map_on_coder_spawn() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
|
||||
// Write a tracked Rust file so git ls-files has something to index.
|
||||
write_file(
|
||||
tmp.path(),
|
||||
"lib.rs",
|
||||
"//! Module doc.\n\n/// A function.\npub fn hello() {}\n",
|
||||
);
|
||||
std::process::Command::new("git")
|
||||
.args(["add", "lib.rs"])
|
||||
.current_dir(tmp.path())
|
||||
.output()
|
||||
.unwrap();
|
||||
std::process::Command::new("git")
|
||||
.args(["commit", "-m", "add lib.rs"])
|
||||
.current_dir(tmp.path())
|
||||
.output()
|
||||
.unwrap();
|
||||
|
||||
// Write an orientation file so we get Some back.
|
||||
write_file(tmp.path(), "CLAUDE.md", "agent hints");
|
||||
|
||||
// Map does not exist yet.
|
||||
let map_path = tmp.path().join(SOURCE_MAP_REL);
|
||||
assert!(!map_path.exists(), "map must not exist before spawn");
|
||||
|
||||
let result = read_project_local_prompt(tmp.path());
|
||||
assert!(
|
||||
result.is_some(),
|
||||
"bundle must be Some when CLAUDE.md present"
|
||||
);
|
||||
|
||||
// Regen should have written the map.
|
||||
assert!(
|
||||
map_path.exists(),
|
||||
"regen must have written source-map.json during spawn"
|
||||
);
|
||||
}
|
||||
|
||||
/// Fallback: regen fails (no git repo) but a stale map on disk is still read.
|
||||
#[test]
|
||||
fn regen_fails_stale_map_still_readable() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
// No git repo — regen will fail with "git ls-files" error.
|
||||
|
||||
write_file(tmp.path(), "CLAUDE.md", "agent hints");
|
||||
// Write a stale map manually.
|
||||
write_file(
|
||||
tmp.path(),
|
||||
SOURCE_MAP_REL,
|
||||
r#"{"stale/entry.rs": ["fn old"]}"#,
|
||||
);
|
||||
|
||||
let result = read_project_local_prompt(tmp.path()).unwrap();
|
||||
assert!(
|
||||
result.contains("stale/entry.rs"),
|
||||
"stale map must still be readable after regen failure: {result}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::string_slice)] // sm_start is derived from str::find — always a char boundary
|
||||
fn source_map_truncated_at_byte_cap() {
|
||||
|
||||
@@ -361,6 +361,54 @@ pub(crate) fn run_squash_merge(
|
||||
"=== Verified: cherry-pick landed on '{base_branch}' with code changes ===\n"
|
||||
));
|
||||
|
||||
// ── Regen source-map.json on master after cherry-pick ─────────
|
||||
// Run deterministically on project_root (now on master). Skip the commit
|
||||
// when regen produces no diff (idempotent case). Failure is non-fatal.
|
||||
{
|
||||
let map_path = project_root.join(".huskies").join("source-map.json");
|
||||
let old_content = std::fs::read_to_string(&map_path).ok();
|
||||
match source_map_gen::regenerate_source_map(project_root, &map_path) {
|
||||
Err(e) => {
|
||||
all_output.push_str(&format!(
|
||||
"=== source-map regen failed (non-fatal): {e} ===\n"
|
||||
));
|
||||
}
|
||||
Ok(()) => {
|
||||
let new_content = std::fs::read_to_string(&map_path).ok();
|
||||
if old_content != new_content {
|
||||
all_output.push_str("=== source-map.json changed — committing on master ===\n");
|
||||
let _ = Command::new("git")
|
||||
.args(["add", ".huskies/source-map.json"])
|
||||
.current_dir(project_root)
|
||||
.output();
|
||||
match Command::new("git")
|
||||
.args(["commit", "-m", "huskies: regen source-map.json"])
|
||||
.current_dir(project_root)
|
||||
.output()
|
||||
{
|
||||
Ok(c) if c.status.success() => {
|
||||
all_output.push_str("=== source-map.json committed on master ===\n");
|
||||
}
|
||||
Ok(c) => {
|
||||
let stderr = String::from_utf8_lossy(&c.stderr);
|
||||
all_output.push_str(&format!(
|
||||
"=== source-map commit failed (non-fatal): {stderr} ===\n"
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
all_output.push_str(&format!(
|
||||
"=== source-map commit error (non-fatal): {e} ===\n"
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
all_output
|
||||
.push_str("=== source-map.json unchanged — no follow-up commit ===\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Clean up ──────────────────────────────────────────────────
|
||||
cleanup_merge_workspace(project_root, &merge_wt_path, &merge_branch);
|
||||
all_output.push_str("=== Merge-queue cleanup complete ===\n");
|
||||
|
||||
@@ -403,6 +403,112 @@ fn squash_merge_runs_component_setup_from_project_toml() {
|
||||
);
|
||||
}
|
||||
|
||||
/// AC6: the regen+commit step runs on `project_root` (master) only.
|
||||
/// After a successful merge where the source-map changes, `git log --name-only`
|
||||
/// shows a follow-up commit whose diff contains ONLY `.huskies/source-map.json`.
|
||||
#[tokio::test]
|
||||
async fn regen_commit_on_master_touches_only_source_map() {
|
||||
use std::fs;
|
||||
use tempfile::tempdir;
|
||||
|
||||
let tmp = tempdir().unwrap();
|
||||
let repo = tmp.path();
|
||||
init_git_repo(repo);
|
||||
|
||||
// Put a stale source-map.json on master so regen will produce a different result.
|
||||
let sk_dir = repo.join(".huskies");
|
||||
fs::create_dir_all(&sk_dir).unwrap();
|
||||
fs::write(sk_dir.join("source-map.json"), "{\"stale\": true}\n").unwrap();
|
||||
|
||||
// Add a tracked Rust file so the regenerator has something to index.
|
||||
fs::create_dir_all(repo.join("src")).unwrap();
|
||||
fs::write(
|
||||
repo.join("src/lib.rs"),
|
||||
"//! Library.\n\n/// Says hello.\npub fn hello() {}\n",
|
||||
)
|
||||
.unwrap();
|
||||
Command::new("git")
|
||||
.args(["add", "."])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
Command::new("git")
|
||||
.args(["commit", "-m", "initial with stale source-map"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
|
||||
// Feature branch: add a new file.
|
||||
Command::new("git")
|
||||
.args(["checkout", "-b", "feature/story-1065_regen_test"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
fs::write(
|
||||
repo.join("src/extra.rs"),
|
||||
"//! Extra.\n\n/// Extra fn.\npub fn extra() {}\n",
|
||||
)
|
||||
.unwrap();
|
||||
Command::new("git")
|
||||
.args(["add", "."])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
Command::new("git")
|
||||
.args(["commit", "-m", "add extra.rs"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
Command::new("git")
|
||||
.args(["checkout", "master"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
run_squash_merge(repo, "feature/story-1065_regen_test", "1065_regen_test").unwrap();
|
||||
|
||||
assert!(
|
||||
matches!(result, super::MergeResult::Success { .. }),
|
||||
"clean merge must succeed; got: {result:?}"
|
||||
);
|
||||
|
||||
// Find the regen commit if one was created.
|
||||
let log_out = Command::new("git")
|
||||
.args(["log", "--oneline", "--name-only"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
let log = String::from_utf8_lossy(&log_out.stdout);
|
||||
|
||||
// If a regen commit exists, its diff must contain ONLY the source-map path.
|
||||
if log.contains("huskies: regen source-map.json") {
|
||||
// Extract files changed in the regen commit.
|
||||
let show_out = Command::new("git")
|
||||
.args(["show", "--name-only", "--format=", "HEAD"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
let show = String::from_utf8_lossy(&show_out.stdout);
|
||||
|
||||
// If HEAD is the regen commit, its files list must be exactly one entry.
|
||||
let head_msg = Command::new("git")
|
||||
.args(["log", "-1", "--format=%s"])
|
||||
.current_dir(repo)
|
||||
.output()
|
||||
.unwrap();
|
||||
let head_subject = String::from_utf8_lossy(&head_msg.stdout);
|
||||
if head_subject.trim() == "huskies: regen source-map.json" {
|
||||
let changed_files: Vec<&str> = show.lines().filter(|l| !l.is_empty()).collect();
|
||||
assert_eq!(
|
||||
changed_files,
|
||||
vec![".huskies/source-map.json"],
|
||||
"regen commit must touch ONLY .huskies/source-map.json; got: {changed_files:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn squash_merge_succeeds_without_components_in_project_toml() {
|
||||
|
||||
@@ -101,6 +101,13 @@ pub struct BotContext {
|
||||
/// `<system-reminder>` block at the head of the next user prompt so Timmy
|
||||
/// sees pipeline activity without requiring a separate message.
|
||||
pub pending_pipeline_events: Arc<TokioMutex<Vec<String>>>,
|
||||
/// Gateway aggregate transition events buffered since the last LLM turn.
|
||||
///
|
||||
/// In gateway mode a background task appends one compact audit line per
|
||||
/// `GatewayStatusEvent` received from the gateway broadcaster. Drained
|
||||
/// alongside `pending_pipeline_events` on each user message. Always
|
||||
/// empty in standalone (non-gateway) mode.
|
||||
pub pending_gateway_events: Arc<TokioMutex<Vec<String>>>,
|
||||
/// Bounded FIFO set of already-handled incoming event IDs.
|
||||
///
|
||||
/// The Matrix sync loop can replay events on reconnect. This set ensures
|
||||
@@ -293,6 +300,7 @@ mod tests {
|
||||
gateway_projects,
|
||||
gateway_project_urls,
|
||||
pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())),
|
||||
pending_gateway_events: Arc::new(TokioMutex::new(Vec::new())),
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new(
|
||||
SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
|
||||
@@ -13,7 +13,7 @@ use super::super::context::BotContext;
|
||||
use super::super::format::markdown_to_html;
|
||||
use super::super::history::{ConversationEntry, ConversationRole, save_history};
|
||||
|
||||
use super::format_user_prompt;
|
||||
use super::{format_drained_events, format_user_prompt};
|
||||
|
||||
pub(in crate::chat::transport::matrix::bot) async fn handle_message(
|
||||
room_id_str: String,
|
||||
@@ -30,28 +30,18 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
|
||||
guard.get(&room_id).and_then(|conv| conv.session_id.clone())
|
||||
};
|
||||
|
||||
// Drain any pipeline transition events buffered since the last LLM turn and
|
||||
// prepend them as a passive <system-reminder> block so Timmy sees pipeline
|
||||
// activity without requiring a separate message. Capped at 20 lines to
|
||||
// keep context size bounded.
|
||||
const MAX_PIPELINE_EVENTS: usize = 20;
|
||||
// Drain pipeline and gateway transition events buffered since the last LLM
|
||||
// turn and prepend them as a passive <system-reminder> block so Timmy sees
|
||||
// pipeline activity without requiring a separate message. Sled events come
|
||||
// from `pending_pipeline_events`; gateway events from `pending_gateway_events`.
|
||||
// In practice only one buffer is non-empty (sled mode vs gateway mode).
|
||||
let system_reminder_prefix = {
|
||||
let mut guard = ctx.pending_pipeline_events.lock().await;
|
||||
if guard.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
let total = guard.len();
|
||||
let lines: Vec<String> = guard.drain(..).collect();
|
||||
drop(guard);
|
||||
let shown_count = total.min(MAX_PIPELINE_EVENTS);
|
||||
let shown = lines[..shown_count].join("\n");
|
||||
let tail = if total > MAX_PIPELINE_EVENTS {
|
||||
format!("\n...and {} more", total - MAX_PIPELINE_EVENTS)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
format!("<system-reminder>\n{shown}{tail}\n</system-reminder>\n")
|
||||
}
|
||||
let mut sled_guard = ctx.pending_pipeline_events.lock().await;
|
||||
let mut gtw_guard = ctx.pending_gateway_events.lock().await;
|
||||
let all_lines: Vec<String> = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect();
|
||||
drop(sled_guard);
|
||||
drop(gtw_guard);
|
||||
format_drained_events(all_lines)
|
||||
};
|
||||
|
||||
// The prompt is just the current message with sender attribution.
|
||||
|
||||
@@ -11,6 +11,27 @@ pub(super) fn format_user_prompt(sender: &str, message: &str) -> String {
|
||||
format!("{sender}: {message}")
|
||||
}
|
||||
|
||||
/// Drain `lines` into a `<system-reminder>` block for injection at the head of
|
||||
/// the next LLM prompt. Returns an empty string when `lines` is empty.
|
||||
///
|
||||
/// At most 20 lines are shown verbatim; excess lines are replaced with a
|
||||
/// `…and N more` indicator to keep context size bounded.
|
||||
pub(in crate::chat::transport::matrix::bot) fn format_drained_events(lines: Vec<String>) -> String {
|
||||
if lines.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
const MAX_PIPELINE_EVENTS: usize = 20;
|
||||
let total = lines.len();
|
||||
let shown_count = total.min(MAX_PIPELINE_EVENTS);
|
||||
let shown = lines[..shown_count].join("\n");
|
||||
let tail = if total > MAX_PIPELINE_EVENTS {
|
||||
format!("\n...and {} more", total - MAX_PIPELINE_EVENTS)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
format!("<system-reminder>\n{shown}{tail}\n</system-reminder>\n")
|
||||
}
|
||||
|
||||
/// Matrix event handler for room messages. Each invocation spawns an
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -51,6 +72,49 @@ mod tests {
|
||||
assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none());
|
||||
}
|
||||
|
||||
// -- format_drained_events ----------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn format_drained_events_empty_returns_empty_string() {
|
||||
assert_eq!(format_drained_events(vec![]), String::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_drained_events_wraps_in_system_reminder() {
|
||||
let result = format_drained_events(vec!["audit ts=2026 id=1 event=x".to_string()]);
|
||||
assert!(result.starts_with("<system-reminder>\n"), "got: {result}");
|
||||
assert!(result.ends_with("</system-reminder>\n"), "got: {result}");
|
||||
assert!(
|
||||
result.contains("audit ts=2026 id=1 event=x"),
|
||||
"got: {result}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_drained_events_caps_at_20_with_overflow_indicator() {
|
||||
let lines: Vec<String> = (0..25).map(|i| format!("line {i}")).collect();
|
||||
let result = format_drained_events(lines);
|
||||
assert!(result.contains("...and 5 more"), "got: {result}");
|
||||
assert!(
|
||||
result.contains("line 19"),
|
||||
"last shown line missing; got: {result}"
|
||||
);
|
||||
assert!(
|
||||
!result.contains("line 20"),
|
||||
"line 21 must be hidden; got: {result}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_drained_events_exactly_20_no_overflow_indicator() {
|
||||
let lines: Vec<String> = (0..20).map(|i| format!("line {i}")).collect();
|
||||
let result = format_drained_events(lines);
|
||||
assert!(
|
||||
!result.contains("...and"),
|
||||
"must not show overflow when exactly 20; got: {result}"
|
||||
);
|
||||
}
|
||||
|
||||
// -- bot_name / system prompt -------------------------------------------
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -325,6 +325,38 @@ pub async fn run_bot(
|
||||
});
|
||||
}
|
||||
|
||||
// Subscribe to gateway-side status events and buffer compact audit lines for
|
||||
// the LLM context. A separate resubscribed receiver is used so both the
|
||||
// buffer task and the room-forwarder task receive every event independently.
|
||||
let pending_gateway_events: Arc<TokioMutex<Vec<String>>> =
|
||||
Arc::new(TokioMutex::new(Vec::new()));
|
||||
let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx {
|
||||
// Buffer task: silently accumulate compact audit lines for Timmy's context.
|
||||
{
|
||||
use crate::service::gateway::polling::format_gateway_audit_line;
|
||||
let buf_rx = event_rx.resubscribe();
|
||||
let buf = Arc::clone(&pending_gateway_events);
|
||||
tokio::spawn(async move {
|
||||
let mut rx = buf_rx;
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(event) => {
|
||||
let line = format_gateway_audit_line(&event.project, &event.event);
|
||||
buf.lock().await.push(line);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
slog!("[matrix-bot] gateway event buffer lagged by {n} events");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Some(event_rx)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let ctx = BotContext {
|
||||
services,
|
||||
matrix_user_id: bot_user_id,
|
||||
@@ -340,6 +372,7 @@ pub async fn run_bot(
|
||||
gateway_projects,
|
||||
gateway_project_urls,
|
||||
pending_pipeline_events,
|
||||
pending_gateway_events,
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new(
|
||||
super::context::SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
@@ -379,13 +412,8 @@ pub async fn run_bot(
|
||||
);
|
||||
}
|
||||
|
||||
// In gateway mode, subscribe to the gateway-side status broadcaster and
|
||||
// forward events to the configured Matrix rooms with a `[project-name]` prefix.
|
||||
// This path delivers events pushed directly by project nodes over WebSocket
|
||||
// (via `/gateway/events/push`), complementing the HTTP-polling path above.
|
||||
// On broadcaster back-pressure (Lagged), the task re-subscribes automatically
|
||||
// so it never permanently stalls.
|
||||
if let Some(event_rx) = gateway_event_rx {
|
||||
// Forwarder task: post gateway events to Matrix rooms with `[project-name]` prefix.
|
||||
if let Some(event_rx) = gateway_event_rx_for_forwarder {
|
||||
let broadcast_room_ids: Vec<String> =
|
||||
announce_room_ids.iter().map(|r| r.to_string()).collect();
|
||||
crate::gateway::spawn_gateway_broadcaster_forwarder(
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! with `[project-name]` prefixes. The actual I/O (HTTP polling, spawning
|
||||
//! tasks, sending messages) lives in `io.rs`.
|
||||
|
||||
use crate::pipeline_state::Stage;
|
||||
use crate::pipeline_state::{Stage, stage_label};
|
||||
use crate::service::events::StoredEvent;
|
||||
use crate::service::notifications::{
|
||||
format_blocked_notification, format_error_notification, format_stage_notification,
|
||||
@@ -52,6 +52,46 @@ pub fn format_gateway_event(project_name: &str, event: &StoredEvent) -> (String,
|
||||
}
|
||||
}
|
||||
|
||||
/// Format a [`StoredEvent`] from a project as a compact audit line for LLM context.
|
||||
///
|
||||
/// Produces a structured one-line entry with stable `key=value` fields, including
|
||||
/// the project name, mirroring the sled-side `format_audit_entry` format.
|
||||
pub fn format_gateway_audit_line(project: &str, event: &StoredEvent) -> String {
|
||||
let ts_ms = event.timestamp_ms();
|
||||
let ts = chrono::DateTime::from_timestamp_millis(ts_ms as i64)
|
||||
.unwrap_or_else(chrono::Utc::now)
|
||||
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
|
||||
|
||||
match event {
|
||||
StoredEvent::StageTransition {
|
||||
story_id,
|
||||
from_stage,
|
||||
to_stage,
|
||||
..
|
||||
} => {
|
||||
let from_label = stage_label(&Stage::from_dir(from_stage).unwrap_or(Stage::Upcoming));
|
||||
let to_label = stage_label(&Stage::from_dir(to_stage).unwrap_or(Stage::Upcoming));
|
||||
format!(
|
||||
"audit ts={ts} project={project} id={story_id} from={from_label} to={to_label} event=stage_transition"
|
||||
)
|
||||
}
|
||||
StoredEvent::MergeFailure {
|
||||
story_id, reason, ..
|
||||
} => {
|
||||
format!(
|
||||
"audit ts={ts} project={project} id={story_id} event=merge_failure reason={reason}"
|
||||
)
|
||||
}
|
||||
StoredEvent::StoryBlocked {
|
||||
story_id, reason, ..
|
||||
} => {
|
||||
format!(
|
||||
"audit ts={ts} project={project} id={story_id} event=story_blocked reason={reason}"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -188,4 +228,64 @@ mod tests {
|
||||
"should not have double spaces from empty name; got: {plain}"
|
||||
);
|
||||
}
|
||||
|
||||
// -- format_gateway_audit_line -------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn audit_line_stage_transition_contains_project_and_stages() {
|
||||
let event = StoredEvent::StageTransition {
|
||||
story_id: "42_story_feat".to_string(),
|
||||
story_name: String::new(),
|
||||
from_stage: "2_current".to_string(),
|
||||
to_stage: "3_qa".to_string(),
|
||||
timestamp_ms: 1_000_000,
|
||||
};
|
||||
let line = format_gateway_audit_line("huskies", &event);
|
||||
assert!(
|
||||
line.starts_with("audit ts="),
|
||||
"must start with audit ts=; got: {line}"
|
||||
);
|
||||
assert!(
|
||||
line.contains("project=huskies"),
|
||||
"must contain project; got: {line}"
|
||||
);
|
||||
assert!(
|
||||
line.contains("id=42_story_feat"),
|
||||
"must contain story id; got: {line}"
|
||||
);
|
||||
assert!(
|
||||
line.contains("event=stage_transition"),
|
||||
"must name event; got: {line}"
|
||||
);
|
||||
assert!(line.contains("from="), "must have from field; got: {line}");
|
||||
assert!(line.contains("to="), "must have to field; got: {line}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn audit_line_merge_failure_contains_reason() {
|
||||
let event = StoredEvent::MergeFailure {
|
||||
story_id: "7_story_bar".to_string(),
|
||||
story_name: String::new(),
|
||||
reason: "conflict in main.rs".to_string(),
|
||||
timestamp_ms: 2_000_000,
|
||||
};
|
||||
let line = format_gateway_audit_line("robot-studio", &event);
|
||||
assert!(line.contains("project=robot-studio"), "got: {line}");
|
||||
assert!(line.contains("event=merge_failure"), "got: {line}");
|
||||
assert!(line.contains("reason=conflict in main.rs"), "got: {line}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn audit_line_story_blocked_contains_reason() {
|
||||
let event = StoredEvent::StoryBlocked {
|
||||
story_id: "3_story_baz".to_string(),
|
||||
story_name: String::new(),
|
||||
reason: "retry limit exceeded".to_string(),
|
||||
timestamp_ms: 3_000_000,
|
||||
};
|
||||
let line = format_gateway_audit_line("proj", &event);
|
||||
assert!(line.contains("project=proj"), "got: {line}");
|
||||
assert!(line.contains("event=story_blocked"), "got: {line}");
|
||||
assert!(line.contains("reason=retry limit exceeded"), "got: {line}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,9 +330,8 @@ pub(crate) fn spawn_event_trigger_subscriber(
|
||||
Ok(f) => f,
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
crate::slog!(
|
||||
"[event-triggers] Lagged {n} transition events; replaying pipeline state to recover"
|
||||
"[event-triggers] Lagged {n} transition events; some triggers may have been skipped"
|
||||
);
|
||||
crate::pipeline_state::replay_current_pipeline_state();
|
||||
continue;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
|
||||
Reference in New Issue
Block a user