huskies: merge 1014
This commit is contained in:
@@ -6,9 +6,7 @@
|
|||||||
//! stage back to the CRDT, and returns a [`TransitionFired`] event for
|
//! stage back to the CRDT, and returns a [`TransitionFired`] event for
|
||||||
//! downstream subscribers.
|
//! downstream subscribers.
|
||||||
|
|
||||||
use super::{
|
use super::{PipelineEvent, StoryId, TransitionFired, read_typed, transition};
|
||||||
PipelineEvent, StoryId, TransitionFired, event_label, read_typed, stage_label, transition,
|
|
||||||
};
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
|
||||||
/// Error type for [`apply_transition`].
|
/// Error type for [`apply_transition`].
|
||||||
@@ -91,14 +89,6 @@ pub fn apply_transition(
|
|||||||
at: Utc::now(),
|
at: Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
crate::slog!(
|
|
||||||
"[pipeline/transition] #{}: {} + {} → {}",
|
|
||||||
story_id,
|
|
||||||
stage_label(&fired.before),
|
|
||||||
event_label(&fired.event),
|
|
||||||
stage_label(&fired.after),
|
|
||||||
);
|
|
||||||
|
|
||||||
super::events::try_broadcast(&fired);
|
super::events::try_broadcast(&fired);
|
||||||
Ok(fired)
|
Ok(fired)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,8 +62,10 @@ pub use apply::{
|
|||||||
transition_to_unfrozen,
|
transition_to_unfrozen,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub use subscribers::spawn_audit_log_subscriber;
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use subscribers::{
|
pub use subscribers::{
|
||||||
AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber,
|
AuditLogSubscriber, AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber,
|
||||||
WebUiBroadcastSubscriber,
|
PipelineItemsSubscriber, WebUiBroadcastSubscriber, format_audit_entry,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,10 +1,63 @@
|
|||||||
//! Concrete subscriber stubs for the event bus.
|
//! Concrete subscriber stubs for the event bus, plus the production audit-log subscriber.
|
||||||
|
|
||||||
use super::Stage;
|
use super::Stage;
|
||||||
use super::events::{TransitionFired, TransitionSubscriber};
|
use super::events::{TransitionFired, TransitionSubscriber};
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
use super::{event_label, stage_dir_name, stage_label};
|
use super::{event_label, stage_dir_name, stage_label};
|
||||||
|
|
||||||
|
// ── Audit log subscriber ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Format a `TransitionFired` event as a structured one-line audit log entry.
|
||||||
|
///
|
||||||
|
/// Fields are in stable `key=value` order separated by spaces:
|
||||||
|
/// `audit ts=<ISO8601> id=<story_id> from=<from_stage> to=<to_stage> event=<event_label>`
|
||||||
|
pub fn format_audit_entry(f: &TransitionFired) -> String {
|
||||||
|
format!(
|
||||||
|
"audit ts={} id={} from={} to={} event={}",
|
||||||
|
f.at.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
|
||||||
|
f.story_id,
|
||||||
|
stage_label(&f.before),
|
||||||
|
stage_label(&f.after),
|
||||||
|
event_label(&f.event),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscriber that writes structured one-line audit entries for every stage transition.
|
||||||
|
pub struct AuditLogSubscriber;
|
||||||
|
|
||||||
|
impl TransitionSubscriber for AuditLogSubscriber {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
"audit-log"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_transition(&self, f: &TransitionFired) {
|
||||||
|
crate::slog!("{}", format_audit_entry(f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a background task that writes a structured audit log entry for every pipeline transition.
|
||||||
|
///
|
||||||
|
/// Subscribes to the transition broadcast channel. Every `TransitionFired` event produces
|
||||||
|
/// one line via [`format_audit_entry`] and writes it to the shared log buffer.
|
||||||
|
pub fn spawn_audit_log_subscriber() {
|
||||||
|
let sub = AuditLogSubscriber;
|
||||||
|
let mut rx = super::events::subscribe_transitions();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(fired) => sub.on_transition(&fired),
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
crate::slog_warn!(
|
||||||
|
"[audit-log] Subscriber lagged, skipped {n} event(s); \
|
||||||
|
some transitions may be absent from the audit log."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// ── Subscriber stubs (real dispatch uses these as the interface) ─────────────
|
// ── Subscriber stubs (real dispatch uses these as the interface) ─────────────
|
||||||
//
|
//
|
||||||
// These are ready to wire into the event bus but not yet connected to the
|
// These are ready to wire into the event bus but not yet connected to the
|
||||||
|
|||||||
@@ -1006,4 +1006,205 @@ fn hotfix_requested_rejected_from_non_done_stages() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Audit log subscriber (story 1014) ──────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_backlog_to_coding_exact_format() {
|
||||||
|
let at = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&chrono::Utc);
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("1014_my_story".into()),
|
||||||
|
before: Stage::Backlog,
|
||||||
|
after: Stage::Coding { claim: None },
|
||||||
|
event: PipelineEvent::DepsMet,
|
||||||
|
at,
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
format_audit_entry(&fired),
|
||||||
|
"audit ts=2026-01-01T00:00:00Z id=1014_my_story from=Backlog to=Coding event=DepsMet"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_is_single_line_with_all_fields() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("42_test".into()),
|
||||||
|
before: Stage::Qa,
|
||||||
|
after: Stage::Merge {
|
||||||
|
feature_branch: fb("feature/story-42"),
|
||||||
|
commits_ahead: nz(3),
|
||||||
|
claim: None,
|
||||||
|
},
|
||||||
|
event: PipelineEvent::GatesPassed {
|
||||||
|
feature_branch: fb("feature/story-42"),
|
||||||
|
commits_ahead: nz(3),
|
||||||
|
},
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(!line.contains('\n'), "audit entry must be a single line");
|
||||||
|
assert!(line.starts_with("audit "), "must start with 'audit '");
|
||||||
|
assert!(line.contains("id=42_test"), "must contain id field");
|
||||||
|
assert!(line.contains("from=Qa"), "must contain from field");
|
||||||
|
assert!(line.contains("to=Merge"), "must contain to field");
|
||||||
|
assert!(
|
||||||
|
line.contains("event=GatesPassed"),
|
||||||
|
"must contain event field"
|
||||||
|
);
|
||||||
|
// Stable field ordering: ts before id before from before to before event.
|
||||||
|
let ts_pos = line.find("ts=").unwrap();
|
||||||
|
let id_pos = line.find("id=").unwrap();
|
||||||
|
let from_pos = line.find("from=").unwrap();
|
||||||
|
let to_pos = line.find("to=").unwrap();
|
||||||
|
let ev_pos = line.find("event=").unwrap();
|
||||||
|
assert!(
|
||||||
|
ts_pos < id_pos && id_pos < from_pos && from_pos < to_pos && to_pos < ev_pos,
|
||||||
|
"fields must appear in order ts, id, from, to, event"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_merge_to_done() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("100_s".into()),
|
||||||
|
before: Stage::Merge {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
claim: None,
|
||||||
|
},
|
||||||
|
after: Stage::Done {
|
||||||
|
merged_at: chrono::Utc::now(),
|
||||||
|
merge_commit: sha("abc"),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: sha("abc"),
|
||||||
|
},
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Merge"), "from=Merge");
|
||||||
|
assert!(line.contains("to=Done"), "to=Done");
|
||||||
|
assert!(
|
||||||
|
line.contains("event=MergeSucceeded"),
|
||||||
|
"event=MergeSucceeded"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_done_to_archived() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("200_s".into()),
|
||||||
|
before: Stage::Done {
|
||||||
|
merged_at: chrono::Utc::now(),
|
||||||
|
merge_commit: sha("x"),
|
||||||
|
},
|
||||||
|
after: Stage::Archived {
|
||||||
|
archived_at: chrono::Utc::now(),
|
||||||
|
reason: ArchiveReason::Completed,
|
||||||
|
},
|
||||||
|
event: PipelineEvent::Accepted,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Done"), "from=Done");
|
||||||
|
assert!(line.contains("to=Archived"), "to=Archived");
|
||||||
|
assert!(line.contains("event=Accepted"), "event=Accepted");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_coding_to_blocked() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("300_s".into()),
|
||||||
|
before: Stage::Coding { claim: None },
|
||||||
|
after: Stage::Blocked {
|
||||||
|
reason: "waiting".into(),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::Block {
|
||||||
|
reason: "waiting".into(),
|
||||||
|
},
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Coding"), "from=Coding");
|
||||||
|
assert!(line.contains("to=Blocked"), "to=Blocked");
|
||||||
|
assert!(line.contains("event=Block"), "event=Block");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_blocked_to_coding() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("400_s".into()),
|
||||||
|
before: Stage::Blocked {
|
||||||
|
reason: "test".into(),
|
||||||
|
},
|
||||||
|
after: Stage::Coding { claim: None },
|
||||||
|
event: PipelineEvent::Unblock,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Blocked"), "from=Blocked");
|
||||||
|
assert!(line.contains("to=Coding"), "to=Coding");
|
||||||
|
assert!(line.contains("event=Unblock"), "event=Unblock");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_merge_to_merge_failure() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("500_s".into()),
|
||||||
|
before: Stage::Merge {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
claim: None,
|
||||||
|
},
|
||||||
|
after: Stage::MergeFailure {
|
||||||
|
kind: MergeFailureKind::Other("conflicts".into()),
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::MergeFailed {
|
||||||
|
kind: MergeFailureKind::Other("conflicts".into()),
|
||||||
|
},
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Merge"), "from=Merge");
|
||||||
|
assert!(line.contains("to=MergeFailure"), "to=MergeFailure");
|
||||||
|
assert!(line.contains("event=MergeFailed"), "event=MergeFailed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_coding_to_frozen() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("600_s".into()),
|
||||||
|
before: Stage::Coding { claim: None },
|
||||||
|
after: Stage::Frozen {
|
||||||
|
resume_to: Box::new(Stage::Coding { claim: None }),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::Freeze,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Coding"), "from=Coding");
|
||||||
|
assert!(line.contains("to=Frozen"), "to=Frozen");
|
||||||
|
assert!(line.contains("event=Freeze"), "event=Freeze");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn audit_entry_coding_to_abandoned() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: StoryId("700_s".into()),
|
||||||
|
before: Stage::Coding { claim: None },
|
||||||
|
after: Stage::Abandoned {
|
||||||
|
ts: chrono::Utc::now(),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::Abandon,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let line = format_audit_entry(&fired);
|
||||||
|
assert!(line.contains("from=Coding"), "from=Coding");
|
||||||
|
assert!(line.contains("to=Abandoned"), "to=Abandoned");
|
||||||
|
assert!(line.contains("event=Abandon"), "event=Abandon");
|
||||||
|
}
|
||||||
|
|
||||||
// ── ProjectionError Display ─────────────────────────────────────────
|
// ── ProjectionError Display ─────────────────────────────────────────
|
||||||
|
|||||||
@@ -18,6 +18,9 @@ pub(crate) fn spawn_event_bridges(
|
|||||||
project_root: Option<PathBuf>,
|
project_root: Option<PathBuf>,
|
||||||
agents: Arc<AgentPool>,
|
agents: Arc<AgentPool>,
|
||||||
) {
|
) {
|
||||||
|
// Audit log subscriber: write one structured line per pipeline transition.
|
||||||
|
crate::pipeline_state::spawn_audit_log_subscriber();
|
||||||
|
|
||||||
// CRDT → watcher bridge: translate CRDT stage-transition events into
|
// CRDT → watcher bridge: translate CRDT stage-transition events into
|
||||||
// WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign)
|
// WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign)
|
||||||
// see a uniform stream regardless of whether the event originated from the
|
// see a uniform stream regardless of whether the event originated from the
|
||||||
|
|||||||
Reference in New Issue
Block a user