//! TransitionFired subscriber that snapshots token cost into the CostRollup register //! on terminal pipeline stage transitions. //! //! When a story reaches Done, Archived, Abandoned, Superseded, or Rejected, //! this subscriber reads the accumulated token records from disk, aggregates //! them by agent, and writes the result into //! [`crate::service::agents::cost_rollup`] so that status renderers and the //! cost command can read from the register instead of re-walking the JSONL file. use std::path::{Path, PathBuf}; use crate::pipeline_state::Stage; use crate::slog; use crate::slog_warn; /// Spawn a background task that maintains the CostRollup register. /// /// On every terminal stage transition (Done, Archived, Abandoned, Superseded, /// Rejected), reads the token records for that story and writes an aggregated /// [`crate::service::agents::cost_rollup::CostRollup`] to the global register. pub(crate) fn spawn_cost_rollup_subscriber(project_root: PathBuf) { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { loop { match rx.recv().await { Ok(fired) => { on_terminal_transition(&project_root, &fired).await; } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { slog_warn!( "[cost-rollup-sub] Subscriber lagged, skipped {n} event(s). \ Some story cost rollups may be stale until next terminal transition." ); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); } /// Returns `true` if `stage` is a terminal pipeline stage. /// /// Terminal stages are those from which no further work is expected: /// Done, Archived, Abandoned, Superseded, Rejected. /// MergeFailure variants are NOT terminal — stories can recover from them. fn is_terminal(stage: &Stage) -> bool { matches!( stage, Stage::Done { .. } | Stage::Archived { .. } | Stage::Abandoned { .. } | Stage::Superseded { .. } | Stage::Rejected { .. } ) } /// Snapshot the cost data for `fired.story_id` into the register when /// `fired.after` is a terminal stage. pub(crate) async fn on_terminal_transition( project_root: &Path, fired: &crate::pipeline_state::TransitionFired, ) { if !is_terminal(&fired.after) { return; } let story_id = &fired.story_id.0; let records = match crate::agents::token_usage::read_all(project_root) { Ok(r) => r, Err(e) => { slog_warn!("[cost-rollup-sub] Failed to read token records for '{story_id}': {e}"); return; } }; let summary = crate::service::agents::token::aggregate_for_story(&records, story_id); crate::service::agents::cost_rollup::set_rollup( project_root, story_id, crate::service::agents::cost_rollup::CostRollup { story_id: story_id.clone(), total_cost_usd: summary.total_cost_usd, agents: summary.agents, recorded_at: fired.at, }, ); slog!( "[cost-rollup-sub] Rolled up cost for '{}': ${:.4}", story_id, summary.total_cost_usd ); } // ── Tests ───────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use crate::agents::TokenUsage; use crate::agents::token_usage::TokenUsageRecord; use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired}; use chrono::Utc; use std::num::NonZeroU32; fn make_record(story_id: &str, agent: &str, cost: f64) -> TokenUsageRecord { TokenUsageRecord { story_id: story_id.to_string(), agent_name: agent.to_string(), timestamp: Utc::now().to_rfc3339(), model: None, usage: TokenUsage { input_tokens: 100, output_tokens: 50, cache_creation_input_tokens: 0, cache_read_input_tokens: 0, total_cost_usd: cost, }, } } fn write_records(root: &std::path::Path, records: &[TokenUsageRecord]) { for r in records { crate::agents::token_usage::append_record(root, r).unwrap(); } } fn fired_done(story_id: &str) -> TransitionFired { TransitionFired { story_id: StoryId(story_id.to_string()), before: Stage::Merge { feature_branch: BranchName("feature/test".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, }, after: Stage::Done { merged_at: Utc::now(), merge_commit: crate::pipeline_state::GitSha("abc123".to_string()), }, event: PipelineEvent::MergeSucceeded { merge_commit: crate::pipeline_state::GitSha("abc123".to_string()), }, at: Utc::now(), } } fn fired_abandoned(story_id: &str) -> TransitionFired { TransitionFired { story_id: StoryId(story_id.to_string()), before: Stage::Coding { claim: None }, after: Stage::Abandoned { ts: Utc::now() }, event: PipelineEvent::Abandon, at: Utc::now(), } } fn fired_coding(story_id: &str) -> TransitionFired { TransitionFired { story_id: StoryId(story_id.to_string()), before: Stage::Backlog, after: Stage::Coding { claim: None }, event: PipelineEvent::DepsMet, at: Utc::now(), } } // ── AC1: subscriber writes the register on terminal transitions ────────── #[tokio::test] async fn done_transition_writes_rollup() { let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_done"; write_records( tmp.path(), &[ make_record(story_id, "coder-1", 1.50), make_record(story_id, "qa-1", 0.50), ], ); let fired = fired_done(story_id); on_terminal_transition(tmp.path(), &fired).await; let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) .expect("rollup must exist after Done transition"); assert!( (rollup.total_cost_usd - 2.0).abs() < f64::EPSILON, "total must be sum of all agents: {}", rollup.total_cost_usd ); assert_eq!(rollup.agents.len(), 2, "must have 2 agent entries"); } // ── AC4: multiple agents produce correct breakdown ──────────────────────── #[tokio::test] async fn multiple_agents_correct_breakdown() { let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_multi"; write_records( tmp.path(), &[ make_record(story_id, "coder-1", 1.00), make_record(story_id, "coder-1", 0.50), // second run make_record(story_id, "qa-1", 0.75), make_record(story_id, "mergemaster", 0.25), // Different story — must not leak into rollup make_record("9999_other_story", "coder-1", 99.0), ], ); on_terminal_transition(tmp.path(), &fired_done(story_id)).await; let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) .expect("rollup must exist"); // Total: 1.00 + 0.50 + 0.75 + 0.25 = 2.50 assert!( (rollup.total_cost_usd - 2.50).abs() < f64::EPSILON, "total must be 2.50, got {}", rollup.total_cost_usd ); // coder-1 should be aggregated: 1.00 + 0.50 = 1.50 let coder = rollup .agents .iter() .find(|a| a.agent_name == "coder-1") .expect("coder-1 must be in agents"); assert!( (coder.total_cost_usd - 1.50).abs() < f64::EPSILON, "coder-1 cost must be 1.50, got {}", coder.total_cost_usd ); // Other story must NOT appear in this rollup assert!( rollup.agents.iter().all(|a| a.agent_name != "9999_other"), "other story's agent must not leak into rollup" ); } // ── Non-terminal transitions are ignored ────────────────────────────────── #[tokio::test] async fn non_terminal_transition_does_not_write_rollup() { let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_nonterminal"; write_records(tmp.path(), &[make_record(story_id, "coder-1", 5.0)]); // Coding transition — not terminal on_terminal_transition(tmp.path(), &fired_coding(story_id)).await; assert!( crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id).is_none(), "non-terminal transition must not write rollup" ); } // ── Abandoned transition is also terminal ───────────────────────────────── #[tokio::test] async fn abandoned_transition_writes_rollup() { let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_abandoned"; write_records(tmp.path(), &[make_record(story_id, "coder-1", 0.30)]); on_terminal_transition(tmp.path(), &fired_abandoned(story_id)).await; let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) .expect("rollup must exist after Abandoned transition"); assert!( (rollup.total_cost_usd - 0.30).abs() < f64::EPSILON, "abandoned story cost must be 0.30, got {}", rollup.total_cost_usd ); } // ── Story with no records gets a zero rollup ────────────────────────────── #[tokio::test] async fn zero_cost_story_gets_empty_rollup() { let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_zero"; // No records written for this story on_terminal_transition(tmp.path(), &fired_done(story_id)).await; let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) .expect("rollup must be written even for zero-cost story"); assert_eq!(rollup.total_cost_usd, 0.0); assert!(rollup.agents.is_empty()); } // ── Via broadcast subscriber ────────────────────────────────────────────── #[tokio::test] async fn broadcast_subscriber_reacts_to_terminal_transition() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let story_id = "1017_sub_broadcast"; crate::db::write_item_with_content( story_id, "2_current", "---\nname: Test\n---\n", crate::db::ItemMeta::named("Test"), ); write_records(tmp.path(), &[make_record(story_id, "coder-1", 1.23)]); spawn_cost_rollup_subscriber(tmp.path().to_path_buf()); crate::agents::lifecycle::abandon_story(story_id).expect("abandon must succeed"); // Give the subscriber task time to process. tokio::time::sleep(std::time::Duration::from_millis(200)).await; let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) .expect("rollup must be written via broadcast subscriber"); assert!( (rollup.total_cost_usd - 1.23).abs() < f64::EPSILON, "rollup cost must match records: {}", rollup.total_cost_usd ); } }