cost.attributed Events — Every LLM Call Tracked
The CostRecord data model, the emit callback pattern that fires a cost.attributed event on every LLM call, and the CFO daily report structure that aggregates spend into a format the operator can act on.
Budget limits and loop detection are control mechanisms. The cost.attributed event is the observability mechanism that makes them useful over time.
Every LLM call produces a record. Every record fires an event. Every event can be consumed by any downstream system that needs cost data — Discord alerts, dashboards, event buses, audit stores. The CFO daily report aggregates all records into a human-readable summary that lands in the operator's Discord channel every morning.
This lesson covers the data model, the event architecture, and the report structure.
The CostRecord Data Model
Every LLM call produces a CostRecord:
class CostRecord:
"""Record of a single LLM call's cost."""
def __init__(
self,
agent_id: str,
session_id: str,
model: str,
input_tokens: int,
output_tokens: int,
):
self.record_id = str(uuid.uuid4())
self.agent_id = agent_id
self.session_id = session_id
self.model = model
self.input_tokens = input_tokens
self.output_tokens = output_tokens
self.cost_usd = self._calculate_cost()
self.timestamp = datetime.now(timezone.utc).isoformat()
Six fields are provided by the caller. Two are generated at record time: record_id (a UUID for deduplication and lookup) and timestamp (UTC ISO format for time-series queries).
The cost_usd field is calculated from the model and token counts at record creation:
def _calculate_cost(self) -> float:
pricing = MODEL_PRICING_USD_PER_MTOK.get(self.model, (0, 0))
input_cost = (self.input_tokens / 1_000_000) * pricing[0]
output_cost = (self.output_tokens / 1_000_000) * pricing[1]
return round(input_cost + output_cost, 6)
The cost is rounded to 6 decimal places — sub-cent precision. This matters for aggregation: summing many small costs with too-aggressive rounding produces materially wrong totals at scale.
Unknown models default to (0, 0) pricing. This is the fail-open behavior for new models: an unrecognized model does not block the call, and its cost is tracked as $0 until the pricing table is updated. The operator sees the call in the records but the cost appears free — a signal to update the pricing table.
The Record Method
The record() method is the call site for every LLM interaction in the system:
def record(
self,
agent_id: str,
session_id: str,
model: str,
input_tokens: int,
output_tokens: int,
) -> CostRecord:
"""Record an LLM call and its cost."""
if input_tokens < 0 or output_tokens < 0:
raise ValueError(
f"Token counts must be non-negative: "
f"input={input_tokens}, output={output_tokens}"
)
record = CostRecord(
agent_id=agent_id,
session_id=session_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
self._records.append(record)
# Cap records to prevent unbounded memory growth
if len(self._records) > self.MAX_RECORDS:
self._records = self._records[-self.MAX_RECORDS:]
self._daily_spend[agent_id] = (
self._daily_spend.get(agent_id, 0) + record.cost_usd
)
if self._emit_callback is not None:
try:
self._emit_callback(record)
except Exception as exc:
logger.warning(f"cost.attributed emit_callback failed: {exc}")
return record
Four things happen in sequence:
- Validation — negative token counts raise immediately. This catches upstream bugs where token counts are incorrectly extracted from API responses.
- Record creation — the
CostRecordis instantiated with cost calculated. - Index updates — the record is appended to
_records(capped at 10,000), and_daily_spend[agent_id]is incremented. - Event emission — the callback fires, wrapped in try/except.
The callback wrapping is deliberate. The _emit_callback fires to external systems — a Discord notifier, a NATS publisher, a database writer. Any of these can fail independently. A failed notification is a monitoring gap. An unrecorded cost is a FinOps integrity failure. The try/except ensures the record is always committed regardless of downstream failures.
The emit_callback Pattern
The CostTracker is initialized with an optional callback:
class CostTracker:
def __init__(
self,
emit_callback: Optional[Callable[["CostRecord"], None]] = None,
):
self._records: list[CostRecord] = []
self._daily_spend: dict[str, float] = {}
self._emit_callback = emit_callback
The callback is a plain Python callable that takes a CostRecord and returns None. In production, the broker passes a function that publishes a cost.attributed event to the Djed event bus:
def publish_cost_event(record: CostRecord) -> None:
djed.publish(
topic="cost.attributed",
payload={
"event_type": "cost.attributed",
"record_id": record.record_id,
"agent_id": record.agent_id,
"session_id": record.session_id,
"model": record.model,
"input_tokens": record.input_tokens,
"output_tokens": record.output_tokens,
"cost_usd": record.cost_usd,
"timestamp": record.timestamp,
}
)
cost_tracker = CostTracker(emit_callback=publish_cost_event)
This design decouples the cost tracking logic from the event transport. Swap Djed for another event bus, or add a second callback for Discord notifications — the CostTracker code does not change. The callback is the extension point.
In testing, emit_callback=None is the default. Tests that verify cost recording behavior do not need to mock an event bus — they just check the records directly.
The Spend Query Interface
The tracker exposes four query methods:
def get_agent_spend(self, agent_id: str) -> float:
"""Get total spend for an agent today."""
return self._daily_spend.get(agent_id, 0.0)
def get_total_spend(self) -> float:
"""Get total spend across all agents today."""
return sum(self._daily_spend.values())
def get_spend_by_agent(self) -> dict[str, float]:
"""Get spend breakdown by agent."""
return dict(self._daily_spend)
def get_spend_by_model(self) -> dict[str, float]:
"""Get spend breakdown by model."""
by_model: dict[str, float] = {}
for record in self._records:
by_model[record.model] = (
by_model.get(record.model, 0) + record.cost_usd
)
return by_model
get_agent_spend() and get_total_spend() read from the _daily_spend dict — O(1) and O(n agents) respectively. These are called on every budget check and must be fast.
get_spend_by_model() iterates all records — O(n records). This is called for dashboards and reports, not for enforcement. Calling it on every LLM request would be an O(n) operation on the hot path. The query methods are designed with their call frequency in mind.
The CFO Daily Report
The generate_daily_report() method assembles the nightly summary:
def generate_daily_report(self) -> dict:
"""Generate the nightly CFO report with spend breakdowns."""
total = self.get_total_spend()
global_ceiling = 25.00
return {
"report_type": "cfo_daily",
"period": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"total_spend_usd": total,
"global_ceiling_usd": global_ceiling,
"ceiling_utilization_pct": round(total / global_ceiling * 100, 1),
"spend_by_agent": self.get_spend_by_agent(),
"spend_by_model": self.get_spend_by_model(),
"records_count": len(self._records),
"top_spenders": sorted(
self.get_spend_by_agent().items(),
key=lambda x: x[1],
reverse=True,
)[:5],
}
The report structure is designed for two audiences: the operator scanning for anomalies, and automated downstream consumers that act on the data.
For the operator scanning for anomalies:
ceiling_utilization_pct— the first number to check. If it is above 80%, something is running hot.top_spenders— the five highest-spending agents, sorted descending. If the top spender is unexpected, it warrants investigation.spend_by_model— model distribution. If a disproportionate share of spend is on Opus or Sonnet when it should be Haiku, the tier routing has a gap.
For automated consumers:
total_spend_usd— raw number for trend dashboards and cost forecastingspend_by_agent— per-agent breakdown for budget utilization trackingrecords_count— a proxy for total LLM activity volume. Spike in records without proportional cost spike = agents moving to cheaper models. Cost spike without records spike = agents using more tokens per call.
A sample report for a typical day:
{
"report_type": "cfo_daily",
"period": "2026-03-28",
"total_spend_usd": 8.42,
"global_ceiling_usd": 25.00,
"ceiling_utilization_pct": 33.7,
"spend_by_agent": {
"openclaw": 2.84,
"advisory-system": 1.97,
"content-pipeline": 1.63,
"analyst-system": 0.89,
"foresight": 0.71,
"sports-agent": 0.38
},
"spend_by_model": {
"claude-sonnet-4-6": 6.91,
"claude-haiku-4-5-20251001": 1.51,
"gemini-2.0-flash": 0.00
},
"records_count": 847,
"top_spenders": [
["openclaw", 2.84],
["advisory-system", 1.97],
["content-pipeline", 1.63],
["analyst-system", 0.89],
["foresight", 0.71]
]
}
33.7% ceiling utilization. Top spenders are expected agents. Sonnet dominates model spend at 82%, Haiku at 18%. 847 records across the day. This is a healthy report — no anomalies, no flags.
Compare to an anomalous report:
{
"ceiling_utilization_pct": 91.4,
"top_spenders": [
["sentinel", 11.37],
["openclaw", 2.84],
...
]
}
Sentinel should spend at most $0.25/day. $11.37 is 45x its budget. Sentinel got into a loop or was triggered by a misconfigured cron. The top_spenders field surfaces this immediately — the operator does not need to scan the full spend_by_agent breakdown.
The Daily Reset
def reset_daily(self):
"""Reset daily spend counters. Called at UTC midnight."""
self._daily_spend.clear()
self._records.clear()
The reset clears both the spend counters and the full record history. This is a hard reset — no carry-forward from the previous day.
The implication: the _records list only contains today's records. get_spend_by_model() only reflects today's model usage. Any historical analysis beyond today requires a persistent store — the audit log (SQLite), the semantic memory layer, or the event bus history. The in-memory tracker is a today-view, not a history.
The reset is called by a scheduled task at UTC midnight. In the broker's main.py, this is typically an asyncio background task:
async def midnight_reset():
"""Reset daily counters at UTC midnight."""
while True:
now = datetime.now(timezone.utc)
# Calculate seconds until next UTC midnight
next_midnight = (now + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
await asyncio.sleep((next_midnight - now).total_seconds())
cost_tracker.reset_daily()
loop_detector_instance.reset_all_sessions()
logger.info("Daily FinOps counters reset at UTC midnight")
What the REST API Exposes
The GET /v1/finops/spend endpoint gives a live view of the current day's spend without the report overhead:
@router.get("/spend")
async def get_spend(request: Request) -> dict[str, Any]:
ct = request.app.state.cost_tracker
return {
"total_spend_usd": ct.get_total_spend(),
"spend_by_agent": ct.get_spend_by_agent(),
"spend_by_model": ct.get_spend_by_model(),
}
This endpoint is unauthenticated in the current implementation — spend data is not sensitive enough to require auth for operators on the local network. The full CFO report endpoint (GET /v1/finops/report) is admin-gated.
Next: Budget Override With Audit Trail
Lesson 209 covers the one piece of the FinOps system that requires human authorization: the budget override endpoint. How the admin REST API enforces the BROKER_ADMIN_TOKEN, what the override payload requires, and why the reason field is not optional.