diff --git a/server/src/pipeline_state/apply.rs b/server/src/pipeline_state/apply.rs index b5f25519..0d31a7ff 100644 --- a/server/src/pipeline_state/apply.rs +++ b/server/src/pipeline_state/apply.rs @@ -6,9 +6,7 @@ //! stage back to the CRDT, and returns a [`TransitionFired`] event for //! downstream subscribers. -use super::{ - PipelineEvent, StoryId, TransitionFired, event_label, read_typed, stage_label, transition, -}; +use super::{PipelineEvent, StoryId, TransitionFired, read_typed, transition}; use chrono::Utc; /// Error type for [`apply_transition`]. @@ -91,14 +89,6 @@ pub fn apply_transition( at: Utc::now(), }; - crate::slog!( - "[pipeline/transition] #{}: {} + {} → {}", - story_id, - stage_label(&fired.before), - event_label(&fired.event), - stage_label(&fired.after), - ); - super::events::try_broadcast(&fired); Ok(fired) } diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 002f4071..21d560c6 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -62,8 +62,10 @@ pub use apply::{ transition_to_unfrozen, }; +pub use subscribers::spawn_audit_log_subscriber; + #[allow(unused_imports)] pub use subscribers::{ - AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber, - WebUiBroadcastSubscriber, + AuditLogSubscriber, AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, + PipelineItemsSubscriber, WebUiBroadcastSubscriber, format_audit_entry, }; diff --git a/server/src/pipeline_state/subscribers.rs b/server/src/pipeline_state/subscribers.rs index e8c0bad3..78e9222d 100644 --- a/server/src/pipeline_state/subscribers.rs +++ b/server/src/pipeline_state/subscribers.rs @@ -1,10 +1,63 @@ -//! Concrete subscriber stubs for the event bus. +//! Concrete subscriber stubs for the event bus, plus the production audit-log subscriber. use super::Stage; use super::events::{TransitionFired, TransitionSubscriber}; #[allow(unused_imports)] use super::{event_label, stage_dir_name, stage_label}; +// ── Audit log subscriber ───────────────────────────────────────────────────── + +/// Format a `TransitionFired` event as a structured one-line audit log entry. +/// +/// Fields are in stable `key=value` order separated by spaces: +/// `audit ts= id= from= to= event=` +pub fn format_audit_entry(f: &TransitionFired) -> String { + format!( + "audit ts={} id={} from={} to={} event={}", + f.at.to_rfc3339_opts(chrono::SecondsFormat::Secs, true), + f.story_id, + stage_label(&f.before), + stage_label(&f.after), + event_label(&f.event), + ) +} + +/// Subscriber that writes structured one-line audit entries for every stage transition. +pub struct AuditLogSubscriber; + +impl TransitionSubscriber for AuditLogSubscriber { + fn name(&self) -> &'static str { + "audit-log" + } + + fn on_transition(&self, f: &TransitionFired) { + crate::slog!("{}", format_audit_entry(f)); + } +} + +/// Spawn a background task that writes a structured audit log entry for every pipeline transition. +/// +/// Subscribes to the transition broadcast channel. Every `TransitionFired` event produces +/// one line via [`format_audit_entry`] and writes it to the shared log buffer. +pub fn spawn_audit_log_subscriber() { + let sub = AuditLogSubscriber; + let mut rx = super::events::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => sub.on_transition(&fired), + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + crate::slog_warn!( + "[audit-log] Subscriber lagged, skipped {n} event(s); \ + some transitions may be absent from the audit log." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + // ── Subscriber stubs (real dispatch uses these as the interface) ───────────── // // These are ready to wire into the event bus but not yet connected to the diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs index 08bb73b3..3cc5c731 100644 --- a/server/src/pipeline_state/tests.rs +++ b/server/src/pipeline_state/tests.rs @@ -1006,4 +1006,205 @@ fn hotfix_requested_rejected_from_non_done_stages() { } } +// ── Audit log subscriber (story 1014) ────────────────────────────────────── + +#[test] +fn audit_entry_backlog_to_coding_exact_format() { + let at = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z") + .unwrap() + .with_timezone(&chrono::Utc); + let fired = TransitionFired { + story_id: StoryId("1014_my_story".into()), + before: Stage::Backlog, + after: Stage::Coding { claim: None }, + event: PipelineEvent::DepsMet, + at, + }; + assert_eq!( + format_audit_entry(&fired), + "audit ts=2026-01-01T00:00:00Z id=1014_my_story from=Backlog to=Coding event=DepsMet" + ); +} + +#[test] +fn audit_entry_is_single_line_with_all_fields() { + let fired = TransitionFired { + story_id: StoryId("42_test".into()), + before: Stage::Qa, + after: Stage::Merge { + feature_branch: fb("feature/story-42"), + commits_ahead: nz(3), + claim: None, + }, + event: PipelineEvent::GatesPassed { + feature_branch: fb("feature/story-42"), + commits_ahead: nz(3), + }, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(!line.contains('\n'), "audit entry must be a single line"); + assert!(line.starts_with("audit "), "must start with 'audit '"); + assert!(line.contains("id=42_test"), "must contain id field"); + assert!(line.contains("from=Qa"), "must contain from field"); + assert!(line.contains("to=Merge"), "must contain to field"); + assert!( + line.contains("event=GatesPassed"), + "must contain event field" + ); + // Stable field ordering: ts before id before from before to before event. + let ts_pos = line.find("ts=").unwrap(); + let id_pos = line.find("id=").unwrap(); + let from_pos = line.find("from=").unwrap(); + let to_pos = line.find("to=").unwrap(); + let ev_pos = line.find("event=").unwrap(); + assert!( + ts_pos < id_pos && id_pos < from_pos && from_pos < to_pos && to_pos < ev_pos, + "fields must appear in order ts, id, from, to, event" + ); +} + +#[test] +fn audit_entry_merge_to_done() { + let fired = TransitionFired { + story_id: StoryId("100_s".into()), + before: Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + claim: None, + }, + after: Stage::Done { + merged_at: chrono::Utc::now(), + merge_commit: sha("abc"), + }, + event: PipelineEvent::MergeSucceeded { + merge_commit: sha("abc"), + }, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Merge"), "from=Merge"); + assert!(line.contains("to=Done"), "to=Done"); + assert!( + line.contains("event=MergeSucceeded"), + "event=MergeSucceeded" + ); +} + +#[test] +fn audit_entry_done_to_archived() { + let fired = TransitionFired { + story_id: StoryId("200_s".into()), + before: Stage::Done { + merged_at: chrono::Utc::now(), + merge_commit: sha("x"), + }, + after: Stage::Archived { + archived_at: chrono::Utc::now(), + reason: ArchiveReason::Completed, + }, + event: PipelineEvent::Accepted, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Done"), "from=Done"); + assert!(line.contains("to=Archived"), "to=Archived"); + assert!(line.contains("event=Accepted"), "event=Accepted"); +} + +#[test] +fn audit_entry_coding_to_blocked() { + let fired = TransitionFired { + story_id: StoryId("300_s".into()), + before: Stage::Coding { claim: None }, + after: Stage::Blocked { + reason: "waiting".into(), + }, + event: PipelineEvent::Block { + reason: "waiting".into(), + }, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Coding"), "from=Coding"); + assert!(line.contains("to=Blocked"), "to=Blocked"); + assert!(line.contains("event=Block"), "event=Block"); +} + +#[test] +fn audit_entry_blocked_to_coding() { + let fired = TransitionFired { + story_id: StoryId("400_s".into()), + before: Stage::Blocked { + reason: "test".into(), + }, + after: Stage::Coding { claim: None }, + event: PipelineEvent::Unblock, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Blocked"), "from=Blocked"); + assert!(line.contains("to=Coding"), "to=Coding"); + assert!(line.contains("event=Unblock"), "event=Unblock"); +} + +#[test] +fn audit_entry_merge_to_merge_failure() { + let fired = TransitionFired { + story_id: StoryId("500_s".into()), + before: Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + claim: None, + }, + after: Stage::MergeFailure { + kind: MergeFailureKind::Other("conflicts".into()), + feature_branch: fb("f"), + commits_ahead: nz(1), + }, + event: PipelineEvent::MergeFailed { + kind: MergeFailureKind::Other("conflicts".into()), + }, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Merge"), "from=Merge"); + assert!(line.contains("to=MergeFailure"), "to=MergeFailure"); + assert!(line.contains("event=MergeFailed"), "event=MergeFailed"); +} + +#[test] +fn audit_entry_coding_to_frozen() { + let fired = TransitionFired { + story_id: StoryId("600_s".into()), + before: Stage::Coding { claim: None }, + after: Stage::Frozen { + resume_to: Box::new(Stage::Coding { claim: None }), + }, + event: PipelineEvent::Freeze, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Coding"), "from=Coding"); + assert!(line.contains("to=Frozen"), "to=Frozen"); + assert!(line.contains("event=Freeze"), "event=Freeze"); +} + +#[test] +fn audit_entry_coding_to_abandoned() { + let fired = TransitionFired { + story_id: StoryId("700_s".into()), + before: Stage::Coding { claim: None }, + after: Stage::Abandoned { + ts: chrono::Utc::now(), + }, + event: PipelineEvent::Abandon, + at: chrono::Utc::now(), + }; + let line = format_audit_entry(&fired); + assert!(line.contains("from=Coding"), "from=Coding"); + assert!(line.contains("to=Abandoned"), "to=Abandoned"); + assert!(line.contains("event=Abandon"), "event=Abandon"); +} + // ── ProjectionError Display ───────────────────────────────────────── diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index eec54053..6ec887ab 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -18,6 +18,9 @@ pub(crate) fn spawn_event_bridges( project_root: Option, agents: Arc, ) { + // Audit log subscriber: write one structured line per pipeline transition. + crate::pipeline_state::spawn_audit_log_subscriber(); + // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the