diff --git a/server/src/agents/pool/cost_rollup_subscriber.rs b/server/src/agents/pool/cost_rollup_subscriber.rs new file mode 100644 index 00000000..dca2450a --- /dev/null +++ b/server/src/agents/pool/cost_rollup_subscriber.rs @@ -0,0 +1,333 @@ +//! TransitionFired subscriber that snapshots token cost into the CostRollup register +//! on terminal pipeline stage transitions. +//! +//! When a story reaches Done, Archived, Abandoned, Superseded, or Rejected, +//! this subscriber reads the accumulated token records from disk, aggregates +//! them by agent, and writes the result into +//! [`crate::service::agents::cost_rollup`] so that status renderers and the +//! cost command can read from the register instead of re-walking the JSONL file. + +use std::path::{Path, PathBuf}; + +use crate::pipeline_state::Stage; +use crate::slog; +use crate::slog_warn; + +/// Spawn a background task that maintains the CostRollup register. +/// +/// On every terminal stage transition (Done, Archived, Abandoned, Superseded, +/// Rejected), reads the token records for that story and writes an aggregated +/// [`crate::service::agents::cost_rollup::CostRollup`] to the global register. +pub(crate) fn spawn_cost_rollup_subscriber(project_root: PathBuf) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + on_terminal_transition(&project_root, &fired).await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[cost-rollup-sub] Subscriber lagged, skipped {n} event(s). \ + Some story cost rollups may be stale until next terminal transition." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Returns `true` if `stage` is a terminal pipeline stage. +/// +/// Terminal stages are those from which no further work is expected: +/// Done, Archived, Abandoned, Superseded, Rejected. +/// MergeFailure variants are NOT terminal — stories can recover from them. +fn is_terminal(stage: &Stage) -> bool { + matches!( + stage, + Stage::Done { .. } + | Stage::Archived { .. } + | Stage::Abandoned { .. } + | Stage::Superseded { .. } + | Stage::Rejected { .. } + ) +} + +/// Snapshot the cost data for `fired.story_id` into the register when +/// `fired.after` is a terminal stage. +pub(crate) async fn on_terminal_transition( + project_root: &Path, + fired: &crate::pipeline_state::TransitionFired, +) { + if !is_terminal(&fired.after) { + return; + } + + let story_id = &fired.story_id.0; + let records = match crate::agents::token_usage::read_all(project_root) { + Ok(r) => r, + Err(e) => { + slog_warn!("[cost-rollup-sub] Failed to read token records for '{story_id}': {e}"); + return; + } + }; + + let summary = crate::service::agents::token::aggregate_for_story(&records, story_id); + + crate::service::agents::cost_rollup::set_rollup( + project_root, + story_id, + crate::service::agents::cost_rollup::CostRollup { + story_id: story_id.clone(), + total_cost_usd: summary.total_cost_usd, + agents: summary.agents, + recorded_at: fired.at, + }, + ); + + slog!( + "[cost-rollup-sub] Rolled up cost for '{}': ${:.4}", + story_id, + summary.total_cost_usd + ); +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::agents::TokenUsage; + use crate::agents::token_usage::TokenUsageRecord; + use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired}; + use chrono::Utc; + use std::num::NonZeroU32; + + fn make_record(story_id: &str, agent: &str, cost: f64) -> TokenUsageRecord { + TokenUsageRecord { + story_id: story_id.to_string(), + agent_name: agent.to_string(), + timestamp: Utc::now().to_rfc3339(), + model: None, + usage: TokenUsage { + input_tokens: 100, + output_tokens: 50, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + total_cost_usd: cost, + }, + } + } + + fn write_records(root: &std::path::Path, records: &[TokenUsageRecord]) { + for r in records { + crate::agents::token_usage::append_record(root, r).unwrap(); + } + } + + fn fired_done(story_id: &str) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::Merge { + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, + }, + after: Stage::Done { + merged_at: Utc::now(), + merge_commit: crate::pipeline_state::GitSha("abc123".to_string()), + }, + event: PipelineEvent::MergeSucceeded { + merge_commit: crate::pipeline_state::GitSha("abc123".to_string()), + }, + at: Utc::now(), + } + } + + fn fired_abandoned(story_id: &str) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::Coding { claim: None }, + after: Stage::Abandoned { ts: Utc::now() }, + event: PipelineEvent::Abandon, + at: Utc::now(), + } + } + + fn fired_coding(story_id: &str) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::Backlog, + after: Stage::Coding { claim: None }, + event: PipelineEvent::DepsMet, + at: Utc::now(), + } + } + + // ── AC1: subscriber writes the register on terminal transitions ────────── + + #[tokio::test] + async fn done_transition_writes_rollup() { + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_done"; + + write_records( + tmp.path(), + &[ + make_record(story_id, "coder-1", 1.50), + make_record(story_id, "qa-1", 0.50), + ], + ); + + let fired = fired_done(story_id); + on_terminal_transition(tmp.path(), &fired).await; + + let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) + .expect("rollup must exist after Done transition"); + assert!( + (rollup.total_cost_usd - 2.0).abs() < f64::EPSILON, + "total must be sum of all agents: {}", + rollup.total_cost_usd + ); + assert_eq!(rollup.agents.len(), 2, "must have 2 agent entries"); + } + + // ── AC4: multiple agents produce correct breakdown ──────────────────────── + + #[tokio::test] + async fn multiple_agents_correct_breakdown() { + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_multi"; + + write_records( + tmp.path(), + &[ + make_record(story_id, "coder-1", 1.00), + make_record(story_id, "coder-1", 0.50), // second run + make_record(story_id, "qa-1", 0.75), + make_record(story_id, "mergemaster", 0.25), + // Different story — must not leak into rollup + make_record("9999_other_story", "coder-1", 99.0), + ], + ); + + on_terminal_transition(tmp.path(), &fired_done(story_id)).await; + + let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) + .expect("rollup must exist"); + + // Total: 1.00 + 0.50 + 0.75 + 0.25 = 2.50 + assert!( + (rollup.total_cost_usd - 2.50).abs() < f64::EPSILON, + "total must be 2.50, got {}", + rollup.total_cost_usd + ); + + // coder-1 should be aggregated: 1.00 + 0.50 = 1.50 + let coder = rollup + .agents + .iter() + .find(|a| a.agent_name == "coder-1") + .expect("coder-1 must be in agents"); + assert!( + (coder.total_cost_usd - 1.50).abs() < f64::EPSILON, + "coder-1 cost must be 1.50, got {}", + coder.total_cost_usd + ); + + // Other story must NOT appear in this rollup + assert!( + rollup.agents.iter().all(|a| a.agent_name != "9999_other"), + "other story's agent must not leak into rollup" + ); + } + + // ── Non-terminal transitions are ignored ────────────────────────────────── + + #[tokio::test] + async fn non_terminal_transition_does_not_write_rollup() { + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_nonterminal"; + + write_records(tmp.path(), &[make_record(story_id, "coder-1", 5.0)]); + + // Coding transition — not terminal + on_terminal_transition(tmp.path(), &fired_coding(story_id)).await; + + assert!( + crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id).is_none(), + "non-terminal transition must not write rollup" + ); + } + + // ── Abandoned transition is also terminal ───────────────────────────────── + + #[tokio::test] + async fn abandoned_transition_writes_rollup() { + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_abandoned"; + + write_records(tmp.path(), &[make_record(story_id, "coder-1", 0.30)]); + on_terminal_transition(tmp.path(), &fired_abandoned(story_id)).await; + + let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) + .expect("rollup must exist after Abandoned transition"); + assert!( + (rollup.total_cost_usd - 0.30).abs() < f64::EPSILON, + "abandoned story cost must be 0.30, got {}", + rollup.total_cost_usd + ); + } + + // ── Story with no records gets a zero rollup ────────────────────────────── + + #[tokio::test] + async fn zero_cost_story_gets_empty_rollup() { + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_zero"; + + // No records written for this story + on_terminal_transition(tmp.path(), &fired_done(story_id)).await; + + let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) + .expect("rollup must be written even for zero-cost story"); + assert_eq!(rollup.total_cost_usd, 0.0); + assert!(rollup.agents.is_empty()); + } + + // ── Via broadcast subscriber ────────────────────────────────────────────── + + #[tokio::test] + async fn broadcast_subscriber_reacts_to_terminal_transition() { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let story_id = "1017_sub_broadcast"; + + crate::db::write_item_with_content( + story_id, + "2_current", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + + write_records(tmp.path(), &[make_record(story_id, "coder-1", 1.23)]); + + spawn_cost_rollup_subscriber(tmp.path().to_path_buf()); + + crate::agents::lifecycle::abandon_story(story_id).expect("abandon must succeed"); + + // Give the subscriber task time to process. + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + let rollup = crate::service::agents::cost_rollup::get_rollup(tmp.path(), story_id) + .expect("rollup must be written via broadcast subscriber"); + assert!( + (rollup.total_cost_usd - 1.23).abs() < f64::EPSILON, + "rollup cost must match records: {}", + rollup.total_cost_usd + ); + } +} diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 363bd473..c539273f 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -1,5 +1,7 @@ //! Agent pool — manages the set of active agents across all pipeline stages. pub(crate) mod auto_assign; +/// TransitionFired subscriber that snapshots token cost on terminal stage transitions. +pub(crate) mod cost_rollup_subscriber; mod pipeline; mod process; mod query; diff --git a/server/src/chat/commands/cost.rs b/server/src/chat/commands/cost.rs index 1e251efc..52e1f42e 100644 --- a/server/src/chat/commands/cost.rs +++ b/server/src/chat/commands/cost.rs @@ -8,43 +8,36 @@ use super::status::story_short_label; /// Show token spend: 24h total, top 5 stories, agent-type breakdown, and /// all-time total. pub(super) fn handle_cost(ctx: &CommandContext) -> Option { - let records = match crate::agents::token_usage::read_all(ctx.effective_root()) { - Ok(r) => r, - Err(e) => return Some(format!("Failed to read token usage: {e}")), - }; + let rollups = crate::service::agents::cost_rollup::all_rollups(ctx.effective_root()); - if records.is_empty() { + if rollups.is_empty() { return Some("**Token Spend**\n\nNo usage records found.".to_string()); } let now = chrono::Utc::now(); let cutoff = now - chrono::Duration::hours(24); - // Partition into 24h window and all-time - let mut recent = Vec::new(); - let mut all_time_cost = 0.0; - for r in &records { - all_time_cost += r.usage.total_cost_usd; - if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&r.timestamp) - && ts >= cutoff - { + // Partition into 24h window (stories that completed recently) and all-time. + let mut recent: Vec<&crate::service::agents::cost_rollup::CostRollup> = Vec::new(); + let mut all_time_cost = 0.0_f64; + for r in &rollups { + all_time_cost += r.total_cost_usd; + if r.recorded_at >= cutoff { recent.push(r); } } - // 24h total - let recent_cost: f64 = recent.iter().map(|r| r.usage.total_cost_usd).sum(); + let recent_cost: f64 = recent.iter().map(|r| r.total_cost_usd).sum(); let mut out = String::from("**Token Spend**\n\n"); out.push_str(&format!("**Last 24h:** ${:.2}\n", recent_cost)); out.push_str(&format!("**All-time:** ${:.2}\n\n", all_time_cost)); // Top 5 most expensive stories (last 24h) - let mut story_costs: HashMap<&str, f64> = HashMap::new(); - for r in &recent { - *story_costs.entry(r.story_id.as_str()).or_default() += r.usage.total_cost_usd; - } - let mut story_list: Vec<(&str, f64)> = story_costs.into_iter().collect(); + let mut story_list: Vec<(&str, f64)> = recent + .iter() + .map(|r| (r.story_id.as_str(), r.total_cost_usd)) + .collect(); story_list.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); story_list.truncate(5); @@ -59,13 +52,15 @@ pub(super) fn handle_cost(ctx: &CommandContext) -> Option { } out.push('\n'); - // Breakdown by agent type (last 24h) + // Breakdown by agent type (last 24h) — derived from per-agent data in rollups. // Agent names follow pattern "coder-1", "qa-1", "mergemaster" — extract // the type as everything before the last '-' digit, or the full name. let mut type_costs: HashMap = HashMap::new(); - for r in &recent { - let agent_type = extract_agent_type(&r.agent_name); - *type_costs.entry(agent_type).or_default() += r.usage.total_cost_usd; + for rollup in &recent { + for agent in &rollup.agents { + let agent_type = extract_agent_type(&agent.agent_name); + *type_costs.entry(agent_type).or_default() += agent.total_cost_usd; + } } let mut type_list: Vec<(String, f64)> = type_costs.into_iter().collect(); type_list.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); @@ -108,6 +103,8 @@ mod tests { for r in records { crate::agents::token_usage::append_record(root, r).unwrap(); } + // Pre-populate the register so the cost command can read from it. + crate::service::agents::cost_rollup::init_from_disk(root); } fn make_usage(cost: f64) -> crate::agents::TokenUsage { diff --git a/server/src/chat/commands/status/render.rs b/server/src/chat/commands/status/render.rs index 6d2ea558..a0fbe9f9 100644 --- a/server/src/chat/commands/status/render.rs +++ b/server/src/chat/commands/status/render.rs @@ -104,14 +104,13 @@ pub(crate) fn build_status_from_items( .map(|a| (a.story_id.clone(), a)) .collect(); - // Read token usage once for all stories to avoid repeated file I/O. - let cost_by_story: HashMap = crate::agents::token_usage::read_all(project_root) - .unwrap_or_default() - .into_iter() - .fold(HashMap::new(), |mut map, r| { - *map.entry(r.story_id).or_insert(0.0) += r.usage.total_cost_usd; - map - }); + // Build a per-story cost map from the in-memory rollup register. + // Only completed stories have entries; in-progress stories show no cost. + let cost_by_story: HashMap = + crate::service::agents::cost_rollup::all_rollups(project_root) + .into_iter() + .map(|r| (r.story_id, r.total_cost_usd)) + .collect(); let config = ProjectConfig::load(project_root).ok(); diff --git a/server/src/chat/commands/status/tests.rs b/server/src/chat/commands/status/tests.rs index 04e101a9..ad8bda22 100644 --- a/server/src/chat/commands/status/tests.rs +++ b/server/src/chat/commands/status/tests.rs @@ -181,6 +181,7 @@ fn status_shows_cost_when_token_usage_exists() { usage, ); crate::agents::token_usage::append_record(tmp.path(), &record).unwrap(); + crate::service::agents::cost_rollup::init_from_disk(tmp.path()); let agents = AgentPool::new_test(3000); let output = build_status_from_items(tmp.path(), &agents, &items); @@ -239,6 +240,7 @@ fn status_aggregates_multiple_records_per_story() { ); crate::agents::token_usage::append_record(tmp.path(), &record).unwrap(); } + crate::service::agents::cost_rollup::init_from_disk(tmp.path()); let agents = AgentPool::new_test(3000); let output = build_status_from_items(tmp.path(), &agents, &items); diff --git a/server/src/service/agents/cost_rollup.rs b/server/src/service/agents/cost_rollup.rs new file mode 100644 index 00000000..85def0a5 --- /dev/null +++ b/server/src/service/agents/cost_rollup.rs @@ -0,0 +1,266 @@ +//! In-memory CostRollup register, keyed by project root. +//! +//! Keying by `project_root` provides test isolation: each test's `TempDir` +//! maps to its own slice of the store, so parallel tests cannot pollute +//! each other's data. +//! +//! Populated on server startup from existing JSONL records +//! ([`init_from_disk`]) and kept current by the cost-rollup subscriber +//! (`agents::pool::cost_rollup_subscriber`) which fires on every terminal +//! pipeline stage transition. +//! +//! All readers call [`get_rollup`] or [`all_rollups`] instead of +//! re-walking `token_usage.jsonl`. + +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{OnceLock, RwLock}; + +use super::token::{AgentTokenCost, TokenCostSummary}; + +// ── Types ───────────────────────────────────────────────────────────────────── + +/// Snapshotted cost totals for a story at the time it reached a terminal stage. +#[derive(Debug, Clone)] +pub struct CostRollup { + /// The story this rollup belongs to. + pub story_id: String, + /// Total USD spend across all agents. + pub total_cost_usd: f64, + /// Per-agent breakdown, sorted alphabetically by agent name. + pub agents: Vec, + /// When the rollup was written (terminal transition timestamp or, for + /// startup-bootstrapped entries, the latest token record timestamp). + pub recorded_at: DateTime, +} + +impl CostRollup { + /// Convert to a [`TokenCostSummary`] for callers that expect the legacy type. + pub fn as_summary(&self) -> TokenCostSummary { + TokenCostSummary { + total_cost_usd: self.total_cost_usd, + agents: self.agents.clone(), + } + } +} + +// ── Global store ────────────────────────────────────────────────────────────── + +/// Composite key: (canonical project root, story_id). +type StoreKey = (PathBuf, String); + +static STORE: OnceLock>> = OnceLock::new(); + +fn store() -> &'static RwLock> { + STORE.get_or_init(|| RwLock::new(HashMap::new())) +} + +fn canonical(p: &Path) -> PathBuf { + p.canonicalize().unwrap_or_else(|_| p.to_path_buf()) +} + +/// Look up the cost rollup for `story_id` within `project_root`. +/// +/// Returns `None` for stories that have not yet reached a terminal stage +/// (or have never had any token records). +pub fn get_rollup(project_root: &Path, story_id: &str) -> Option { + let key = (canonical(project_root), story_id.to_string()); + store().read().ok()?.get(&key).cloned() +} + +/// Write or overwrite the cost rollup for `story_id` within `project_root`. +pub fn set_rollup(project_root: &Path, story_id: impl Into, rollup: CostRollup) { + let key = (canonical(project_root), story_id.into()); + if let Ok(mut guard) = store().write() { + guard.insert(key, rollup); + } +} + +/// Return a snapshot of all rollups for `project_root`, in arbitrary order. +pub fn all_rollups(project_root: &Path) -> Vec { + let root = canonical(project_root); + store() + .read() + .map(|g| { + g.iter() + .filter(|((r, _), _)| r == &root) + .map(|(_, v)| v.clone()) + .collect() + }) + .unwrap_or_default() +} + +// ── Startup bootstrap ───────────────────────────────────────────────────────── + +/// Pre-populate the register from `token_usage.jsonl`. +/// +/// Called once at server startup before the live subscriber starts listening. +/// Stories with zero cost are skipped. `recorded_at` is set to the latest +/// record timestamp for each story so the 24 h window in the cost command +/// reflects when work was actually done rather than when the server restarted. +pub fn init_from_disk(project_root: &Path) { + let records = match crate::agents::token_usage::read_all(project_root) { + Ok(r) => r, + Err(e) => { + crate::slog_warn!("[cost-rollup] init_from_disk failed to read token records: {e}"); + return; + } + }; + + if records.is_empty() { + return; + } + + // Collect unique story IDs. + let mut story_ids: Vec<&str> = records.iter().map(|r| r.story_id.as_str()).collect(); + story_ids.sort_unstable(); + story_ids.dedup(); + + for sid in story_ids { + let summary = super::token::aggregate_for_story(&records, sid); + if summary.total_cost_usd == 0.0 && summary.agents.is_empty() { + continue; + } + + // Use the latest record timestamp as recorded_at. + let recorded_at = records + .iter() + .filter(|r| r.story_id == sid) + .filter_map(|r| chrono::DateTime::parse_from_rfc3339(&r.timestamp).ok()) + .map(|ts| ts.with_timezone(&Utc)) + .max() + .unwrap_or_else(Utc::now); + + set_rollup( + project_root, + sid, + CostRollup { + story_id: sid.to_string(), + total_cost_usd: summary.total_cost_usd, + agents: summary.agents, + recorded_at, + }, + ); + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::agents::TokenUsage; + use crate::agents::token_usage::TokenUsageRecord; + + fn dummy_root() -> PathBuf { + std::env::temp_dir().join("cost_rollup_dummy_root") + } + + fn make_record(story_id: &str, agent: &str, cost: f64, ts: &str) -> TokenUsageRecord { + TokenUsageRecord { + story_id: story_id.to_string(), + agent_name: agent.to_string(), + timestamp: ts.to_string(), + model: None, + usage: TokenUsage { + input_tokens: 100, + output_tokens: 50, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + total_cost_usd: cost, + }, + } + } + + #[test] + fn get_rollup_returns_none_for_unknown_story() { + let root = dummy_root(); + assert!(get_rollup(&root, "999_no_such_story_xyzzy").is_none()); + } + + #[test] + fn set_and_get_rollup_roundtrip() { + let root = dummy_root(); + let rollup = CostRollup { + story_id: "1017_test_roundtrip".to_string(), + total_cost_usd: 3.50, + agents: vec![], + recorded_at: Utc::now(), + }; + set_rollup(&root, "1017_test_roundtrip", rollup); + let got = get_rollup(&root, "1017_test_roundtrip").expect("rollup must exist after set"); + assert!((got.total_cost_usd - 3.50).abs() < f64::EPSILON); + } + + #[test] + fn init_from_disk_populates_store() { + let tmp = tempfile::tempdir().unwrap(); + let records = vec![ + make_record("1017_init_a", "coder-1", 1.0, "2026-01-01T00:00:00Z"), + make_record("1017_init_a", "qa-1", 0.5, "2026-01-02T00:00:00Z"), + make_record("1017_init_b", "coder-1", 2.0, "2026-01-01T12:00:00Z"), + ]; + for r in &records { + crate::agents::token_usage::append_record(tmp.path(), r).unwrap(); + } + init_from_disk(tmp.path()); + + let a = get_rollup(tmp.path(), "1017_init_a").expect("story a must be in store"); + assert!((a.total_cost_usd - 1.5).abs() < f64::EPSILON); + assert_eq!(a.agents.len(), 2); + + let b = get_rollup(tmp.path(), "1017_init_b").expect("story b must be in store"); + assert!((b.total_cost_usd - 2.0).abs() < f64::EPSILON); + } + + #[test] + fn init_from_disk_uses_latest_timestamp_as_recorded_at() { + let tmp = tempfile::tempdir().unwrap(); + let records = vec![ + make_record("1017_ts_test", "coder-1", 1.0, "2026-01-01T00:00:00Z"), + make_record("1017_ts_test", "qa-1", 0.5, "2026-01-03T00:00:00Z"), + ]; + for r in &records { + crate::agents::token_usage::append_record(tmp.path(), r).unwrap(); + } + init_from_disk(tmp.path()); + + let rollup = get_rollup(tmp.path(), "1017_ts_test").expect("rollup must exist"); + // recorded_at should be 2026-01-03 (the later timestamp) + assert_eq!(rollup.recorded_at.date_naive().to_string(), "2026-01-03"); + } + + #[test] + fn all_rollups_isolated_by_project_root() { + let tmp_a = tempfile::tempdir().unwrap(); + let tmp_b = tempfile::tempdir().unwrap(); + set_rollup( + tmp_a.path(), + "1017_all_a", + CostRollup { + story_id: "1017_all_a".to_string(), + total_cost_usd: 1.0, + agents: vec![], + recorded_at: Utc::now(), + }, + ); + set_rollup( + tmp_b.path(), + "1017_all_b", + CostRollup { + story_id: "1017_all_b".to_string(), + total_cost_usd: 2.0, + agents: vec![], + recorded_at: Utc::now(), + }, + ); + let all_a = all_rollups(tmp_a.path()); + assert!(all_a.iter().any(|r| r.story_id == "1017_all_a")); + assert!(!all_a.iter().any(|r| r.story_id == "1017_all_b")); + + let all_b = all_rollups(tmp_b.path()); + assert!(all_b.iter().any(|r| r.story_id == "1017_all_b")); + assert!(!all_b.iter().any(|r| r.story_id == "1017_all_a")); + } +} diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index f4124799..5f8cbf03 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -6,6 +6,8 @@ //! into `AgentPool` or the filesystem. //! //! Conventions: `docs/architecture/service-modules.md` +/// In-memory cost rollup register — written by the cost-rollup subscriber. +pub mod cost_rollup; mod io; /// Agent selection heuristics — pick the best agent for a story. pub mod selection; @@ -215,13 +217,20 @@ pub fn get_test_results( io::read_test_results_from_file(project_root, story_id) } -/// Get the aggregated token cost for a specific story. +/// Get the aggregated token cost for a specific story from the rollup register. +/// +/// Returns a zero-cost summary when no rollup has been recorded yet (story +/// still in progress or no token records exist). pub fn get_work_item_token_cost( project_root: &Path, story_id: &str, ) -> Result { - let records = io::read_token_records(project_root)?; - Ok(token::aggregate_for_story(&records, story_id)) + Ok(cost_rollup::get_rollup(project_root, story_id) + .map(|r| r.as_summary()) + .unwrap_or(TokenCostSummary { + total_cost_usd: 0.0, + agents: vec![], + })) } /// Get all token usage records across all stories. diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 0839bb78..55b7a714 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -73,6 +73,14 @@ pub(crate) fn spawn_event_bridges( // accumulating in the process heap (story 996). crate::db::gc::spawn_content_gc_subscriber(); + // Cost-rollup bootstrap: pre-populate the register from existing JSONL + // so status renderers show correct costs after a server restart. + crate::service::agents::cost_rollup::init_from_disk(&root); + + // Cost-rollup subscriber: snapshots per-story token costs into the + // in-memory register whenever a story reaches a terminal stage. + crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone()); + let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move {