Metrics¶
Each metric manages its own tables, registration, and computation. Last updated: March 2026
Design¶
Each metric is self-contained. A metric:
- Declares its database tables — the metric owns its schema, created on registration
- Registers itself — declares a name, its query interface, and (optionally) the events it subscribes to
- Computes results — either by querying billing tables directly (same-database mode) or by consuming events from Kafka (ingestion mode)
- Exposes queries — the
MetricsEnginedelegates queries to the appropriate metric
This means adding a new metric requires zero changes to existing code. Write a metric class, register it, done.
Dual-Mode Computation¶
Metrics support two computation modes, matching the dual architecture:
- Event-driven mode (ingestion) — the metric consumes events from Kafka and maintains its own materialized tables. This is the primary mode, used with Stripe and any webhook-based connector.
- Direct-query mode (same-database) — the metric queries billing tables via the
DatabaseConnectorat query time. Used with Lago/Kill Bill. No Kafka, no event processing. Lower priority.
Metrics must support at least one mode. The same metric can support both — using event-driven processing by default and falling back to direct queries when a DatabaseConnector is available.
Metric Transparency¶
Every metric must document its computation methodology:
- Formula — the mathematical definition
- SQL — the actual query used to compute the metric
- Assumptions — what counts as "active", how partial months are handled, etc.
- Edge cases — how the metric behaves with zero customers, mid-month changes, etc.
This transparency is the project's core differentiator. Metric definitions are code: reviewable, forkable, contributable. No black boxes.
Base Class¶
from abc import ABC, abstractmethod
from datetime import date
from decimal import Decimal
from dataclasses import dataclass, field
from typing import Any
from sqlalchemy import MetaData
from tidemill.metrics.query import Cube
from tidemill.segments.model import SegmentDef
@dataclass
class QuerySpec:
"""Declarative query specification resolved against a metric's Cube.
Dimensions and filters reference names defined in the model (e.g., "plan_interval",
"customer_country"). The model validates them and compiles to SQL via fragment
composition. See [Cubes & Query Algebra](cubes.md) and [Segmentation](segments.md).
Return shape:
- dimensions set → list of dicts, one row per group
- compare set → list of dicts, one row per `segment_id` (ratio metrics
return per-segment numerator/denominator pairs already
divided — see "Compare-mode return shape" below)
- otherwise → scalar or time series
"""
# Dimensions to group by (names from model.Dimensions)
# "plan_id" — one row per plan
# "plan_interval" — one row per billing interval (monthly/yearly/...)
# "customer_country" — one row per country
# "source_id" — one row per billing source
# "currency" — one row per currency (uses *_cents, not base-currency aggregate)
# "cohort_month" — retention-specific: one row per cohort
# Computed dims from MRR cubes: "mrr_band", "arr_band", "tenure_months",
# "cohort_month" (CASE WHEN / DATE_TRUNC expressions)
dimensions: list[str] = field(default_factory=list)
# Filters: dimension_name → value (equality) or {op: value} for other operators
# {"customer_country": "US"} — equality
# {"plan_interval": {"in": ["monthly", "yearly"]}} — IN list
filters: dict[str, Any] = field(default_factory=dict)
# Time bucketing
granularity: str | None = None # day | week | month | quarter | year
time_range: tuple[str, str] | None = None
# Segmentation — see segments.md for the AST + compilation model
# segment — a parsed SegmentDef applied as a universe filter (AND'd
# into every row, narrows the rowset for the whole query)
# compare — list of (segment_id, SegmentDef) pairs; compile() emits a
# CROSS JOIN over a VALUES list of segment_ids and a compound
# OR predicate so each row is tagged with every branch it
# matches (overlapping segments produce duplicate rows, by
# design)
segment: SegmentDef | None = None
compare: tuple[tuple[str, SegmentDef], ...] | None = None
class Metric(ABC):
"""Base class for metrics.
All I/O methods are async — metrics use SQLAlchemy AsyncSession for database
access and await connector calls. The engine and consumers are fully async.
"""
@property
@abstractmethod
def name(self) -> str:
"""Metric identifier, e.g. 'mrr', 'churn'."""
...
@property
def dependencies(self) -> list[str]:
"""Names of other metrics this metric depends on.
The engine ensures all dependencies are registered and initialized
before this metric is started.
Example: QuickRatioMetric depends on MRR data, so it declares ['mrr'].
The engine injects the MRR metric instance so this metric can query its tables.
"""
return []
@property
def event_types(self) -> list[str]:
"""Event types this metric subscribes to (ingestion mode).
Return empty list if metric only supports direct-query mode."""
return []
@property
def primary_cube(self) -> type[Cube]:
"""The cube exposed as this metric's filter / group-by surface.
Used by the discovery endpoint (`GET /api/metrics/{name}/fields`)
and the segment validator so generic routers don't have to
hard-code which cube belongs to which metric. Defaults to
`self.model`. Metrics with multiple cubes (e.g. churn — event +
state) override to pick the one that carries the richest
end-user filter set.
"""
return self.model
def register_tables(self, metadata: MetaData) -> None:
"""Define SQLAlchemy tables owned by this metric.
Synchronous — called once at startup before the event loop runs."""
pass
async def handle_event(self, event: Event) -> None:
"""Process a single event (ingestion mode). Must be idempotent.
Called by the async Kafka consumer for each matching event."""
raise NotImplementedError("This metric does not support event-driven mode")
@abstractmethod
async def query(self, params: dict, spec: QuerySpec | None = None) -> Any:
"""Answer a metric query.
params: query-type-specific parameters (at, start, end, interval, ...)
spec: optional QuerySpec with dimensions and filters. Names reference
the metric's Cube. All built-in metrics support this.
Async — issues SQL via AsyncSession or awaits DatabaseConnector methods.
"""
...
Registry¶
Metrics register themselves via a decorator:
from tidemill.metrics import register
@register
class MrrMetric(Metric):
name = "mrr"
dependencies = [] # no dependencies
event_types = ["subscription.created", "subscription.activated",
"subscription.changed", "subscription.canceled",
"subscription.churned", "subscription.reactivated",
"subscription.paused", "subscription.resumed"]
async def query(self, params, spec=None):
if self.connector: # same-database mode (Lago/Kill Bill)
return await self._query_direct(params, spec)
# Ingestion mode (Stripe) — primary path
return await self._query_materialized(params, spec)
@register
class RetentionMetric(Metric):
name = "retention"
dependencies = ["mrr"] # engine injects mrr metric at startup
event_types = ["subscription.created", "subscription.activated",
"subscription.churned", "subscription.reactivated"]
async def query(self, params, spec=None):
# NRR / GRR query metric_mrr_movement directly; cohort matrix reads
# metric_retention_cohort + metric_retention_activity.
...
The Quick Ratio is not a registered metric — it is a pure derivation from MRR movements. It lives in the reports layer as tidemill.reports.mrr.quick_ratio(tm, start, end) and in the summary endpoint's post-processing.
At startup, the engine:
- Discovers all registered metrics
- Resolves dependency order — topological sort of the dependency graph; raises on cycles
- Initializes metrics in dependency order; injects resolved instances
- Calls
register_tables()on each — tables are added to the SQLAlchemy metadata - Runs Alembic migrations (or
metadata.create_all()in dev) - Ingestion mode only: starts a Kafka consumer per metric (consumer group:
tidemill.metric.{name})
Lifecycle¶
Ingestion Mode (Stripe) — Primary¶
Startup Runtime Query
│ │ │
│ register_tables() │ Kafka event arrives │ GET /api/metrics/mrr
│ create tables if needed │ ──────────────────► │ ─────────────────►
│ seek to last offset │ handle_event(event) │ metric.query(params)
│ │ update metric tables │ → SQL against metric_* tables
│ │ commit offset │ return result
Replay / Backfill (Ingestion Mode Only)¶
When a new metric is added to an existing deployment:
- Metric tables are created (empty)
- Consumer group is new, so Kafka offset starts at the beginning (or reads from
event_log) - All historical events are replayed through
handle_event() - Metric catches up to head and starts processing live events
To recompute a metric from scratch: reset the consumer group offset to 0 and truncate the metric's tables.
Same-Database Mode (Lago/Kill Bill) — Alternative¶
Startup Query
│ │
│ register_tables() │ GET /api/metrics/mrr
│ create tables if needed │ ─────────────────►
│ │ connector.get_mrr_cents()
│ │ → SQL against billing tables
│ │ return result
No Kafka, no event processing. The database connector queries billing tables at request time.
Priority¶
| Metric | Priority | Scope |
|---|---|---|
| MRR | P0 | MRR, ARR, net new MRR breakdown |
| Churn | P0 | Logo churn, revenue churn, net revenue churn |
| Retention | P0 | Monthly cohorts, NRR, GRR |
| LTV | P1 | LTV, ARPU, cohort LTV |
| Trials | P1 | Trial conversion rate |
Built-in Metrics¶
MRR (P0)¶
Subscribes to (ingestion mode): subscription.created, subscription.activated, subscription.changed, subscription.canceled, subscription.churned, subscription.reactivated, subscription.paused, subscription.resumed, invoice.paid (drives the trailing-3m usage component)
Direct queries (same-database mode): DatabaseConnector.get_mrr_cents(), DatabaseConnector.get_subscription_changes()
Tables:
-- Running MRR snapshot, updated on every subscription event
CREATE TABLE metric_mrr_snapshot (
id UUID PRIMARY KEY,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
subscription_id TEXT NOT NULL,
-- Combined MRR = subscription_mrr + usage_mrr (kept for read efficiency).
mrr_cents BIGINT NOT NULL,
mrr_base_cents BIGINT NOT NULL,
-- Component breakdown — subscription is licensed-recurring (Stripe
-- non-metered items); usage is the trailing-3m smoothed metered charge.
subscription_mrr_cents BIGINT NOT NULL DEFAULT 0,
subscription_mrr_base_cents BIGINT NOT NULL DEFAULT 0,
usage_mrr_cents BIGINT NOT NULL DEFAULT 0,
usage_mrr_base_cents BIGINT NOT NULL DEFAULT 0,
currency TEXT NOT NULL, -- ISO 4217
snapshot_at TIMESTAMPTZ NOT NULL,
UNIQUE(source_id, subscription_id)
);
-- MRR movements (append-only log for breakdown queries)
CREATE TABLE metric_mrr_movement (
id UUID PRIMARY KEY,
event_id UUID NOT NULL UNIQUE, -- idempotency: one movement per event
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
subscription_id TEXT NOT NULL,
movement_type TEXT NOT NULL, -- new | expansion | contraction | churn | reactivation
-- Origin: 'subscription' (licensed-plan changes) or 'usage' (trailing-3m
-- component shift). Lets the waterfall split deltas by source.
source TEXT NOT NULL DEFAULT 'subscription',
amount_cents BIGINT NOT NULL,
amount_base_cents BIGINT NOT NULL,
currency TEXT NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL
);
-- Per-subscription monthly bucket of finalized usage charges. Trailing-3m
-- average of this table feeds metric_mrr_snapshot.usage_mrr_*. Also the
-- canonical store backing the usage_revenue metric.
CREATE TABLE metric_mrr_usage_component (
id UUID PRIMARY KEY,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
subscription_id TEXT NOT NULL,
period_start DATE NOT NULL, -- first-of-month UTC
usage_cents BIGINT NOT NULL,
usage_base_cents BIGINT NOT NULL,
currency TEXT NOT NULL,
computed_at TIMESTAMPTZ NOT NULL,
UNIQUE(source_id, subscription_id, period_start)
);
Usage component (trailing 3-month): On every invoice.paid event, the MRR metric sums the invoice's kind='usage' line items into the bucket for that billing month, recomputes the trailing-3m mean for the subscription, and emits an expansion/contraction movement (tagged source='usage') when the mean shifts. See tidemill/metrics/mrr/usage.py and docs/definitions.md#mrr for the formal definition. This is what makes pure-usage customers (e.g. the seeded "Active Starter" archetype) visible to downstream churn and LTV — without it they show $0 MRR and are invisible to logo-churn entirely.
Event handling:
async def handle_event(self, event: Event) -> None:
match event.type:
case "subscription.created":
# Snapshot only — "new" movement deferred to subscription.activated
# to avoid double-counting trials that later convert.
await self._upsert_snapshot(event, event.payload["mrr_cents"],
event.payload["mrr_base_cents"])
case "subscription.activated":
await self._upsert_snapshot(event, event.payload["mrr_cents"],
event.payload["mrr_base_cents"])
await self._append_movement(event, "new", event.payload["mrr_cents"],
event.payload["mrr_base_cents"])
case "subscription.changed":
prev, prev_usd = event.payload["prev_mrr_cents"], event.payload["prev_mrr_base_cents"]
new, new_usd = event.payload["new_mrr_cents"], event.payload["new_mrr_base_cents"]
await self._upsert_snapshot(event, new, new_usd)
delta, delta_usd = new - prev, new_usd - prev_usd
kind = "expansion" if delta > 0 else "contraction"
await self._append_movement(event, kind, delta, delta_usd)
case "subscription.churned":
prev, prev_usd = event.payload["prev_mrr_cents"], event.payload["prev_mrr_base_cents"]
await self._upsert_snapshot(event, 0, 0)
await self._append_movement(event, "churn", -prev, -prev_usd)
case "subscription.reactivated":
mrr, mrr_base = event.payload["mrr_cents"], event.payload["mrr_base_cents"]
await self._upsert_snapshot(event, mrr, mrr_base)
await self._append_movement(event, "reactivation", mrr, mrr_base)
case "subscription.paused":
mrr, mrr_base = event.payload["mrr_cents"], event.payload["mrr_base_cents"]
await self._upsert_snapshot(event, 0, 0)
await self._append_movement(event, "churn", -mrr, -mrr_base)
case "subscription.resumed":
mrr, mrr_base = event.payload["mrr_cents"], event.payload["mrr_base_cents"]
await self._upsert_snapshot(event, mrr, mrr_base)
await self._append_movement(event, "reactivation", mrr, mrr_base)
Queries (dual-mode):
async def query(self, params: dict, spec=None) -> Any:
match params.get("query_type"):
case "current":
if self.connector:
# Same-database mode: query billing tables directly
return await self.connector.get_mrr_base_cents(params.get("at"))
# Ingestion mode (primary): query materialized metric_mrr_snapshot
return await self._current_mrr(params.get("at"), spec)
case "series":
return await self._mrr_series(params["start"], params["end"],
params["interval"], spec)
case "breakdown":
return await self._mrr_breakdown(params["start"], params["end"], spec)
case "arr":
return await self.query({**params, "query_type": "current"}, spec) * 12
Query building with cubes and fragment composition:
Each MRR query method composes QueryFragment objects from MRRSnapshotCube or MRRMovementCube. The model declares available joins, measures, and dimensions; fragments carry the required joins automatically. See Cubes & Query Algebra for the full approach.
async def _current_mrr(self, at: date | None, spec: QuerySpec | None):
# Use original-currency measure when caller groups by currency
use_original = spec and "currency" in (spec.dimensions or [])
m = self.model # MRRSnapshotCube
measure = m.measures.mrr_original if use_original else m.measures.mrr
# Base: always-present fragments
q = measure + m.where("s.mrr_base_cents", ">", 0)
# Time filter
if at:
q = q + m.filter("snapshot_at", "<=", at)
# Apply user-requested dimensions, filters, segment (universe filter),
# and compare (per-branch slicing) from spec. build_spec_fragment is
# the single bridge between QuerySpec and Cube — see
# tidemill/segments/compiler.py.
q = q + await build_spec_fragment(m, spec, self.db)
stmt, params = q.compile(m)
result = await self.db.execute(stmt, params)
rows = result.mappings().all()
# Scalar when no dimensions and no compare, list of dicts otherwise.
has_compare = bool(spec and spec.compare)
if not spec or (not spec.dimensions and not has_compare):
return rows[0]["mrr"] if rows else 0
return [dict(r) for r in rows]
Example: SQL generated for _current_mrr with various specs
No spec — plain aggregate:
SELECT SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
WHERE s.mrr_base_cents > 0
QuerySpec(filters={"plan_interval": {"in": ["yearly"]}}) — filter only, no dimensions:
SELECT SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id AND sub.external_id = s.subscription_id
JOIN plan p ON p.id = sub.plan_id
WHERE s.mrr_base_cents > 0
AND p.interval = ANY(:plan_interval)
QuerySpec(dimensions=["plan_interval", "customer_country"]) — dimensional cut:
SELECT p.interval AS plan_interval,
c.country AS customer_country,
SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id AND sub.external_id = s.subscription_id
JOIN plan p ON p.id = sub.plan_id
JOIN customer c ON c.source_id = s.source_id AND c.external_id = s.customer_id
WHERE s.mrr_base_cents > 0
GROUP BY p.interval, c.country
QuerySpec(filters={"customer_country": {"in": ["US", "DE"]}}, dimensions=["plan_id"]) — filter + dimension:
SELECT sub.plan_id,
SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id AND sub.external_id = s.subscription_id
JOIN customer c ON c.source_id = s.source_id AND c.external_id = s.customer_id
WHERE s.mrr_base_cents > 0
AND c.country = ANY(:customer_country)
GROUP BY sub.plan_id
Breakdown query with fragment composition:
async def _mrr_breakdown(self, start: date, end: date, spec: QuerySpec | None):
mm = self.movement_model # MRRMovementCube
q = (
mm.measures.amount
+ mm.dimension("movement_type")
+ mm.filter("occurred_at", "between", (start, end))
)
q = q + await build_spec_fragment(mm, spec, self.db)
stmt, params = q.compile(mm)
result = await self.db.execute(stmt, params)
return [dict(r) for r in result.mappings().all()]
Example: breakdown SQL with QuerySpec(dimensions=["plan_id"]):
SELECT m.movement_type,
sub.plan_id,
SUM(m.amount_base_cents) AS amount_base
FROM metric_mrr_movement m
JOIN subscription sub ON sub.source_id = m.source_id AND sub.external_id = m.subscription_id
WHERE m.occurred_at BETWEEN :start AND :end
GROUP BY m.movement_type, sub.plan_id
Waterfall query — monthly MRR bridge:
The waterfall builds a month-by-month bridge from starting MRR to ending MRR. It combines a baseline snapshot query with movement aggregation:
- Baseline — query
metric_mrr_snapshotfor total MRR at the start of the range (same as_current_mrr(start)) - Movements — query
metric_mrr_movementgrouped bydate_trunc('month', occurred_at)andmovement_type - Accumulate — walk months in order; each month's
starting_mrr= previous month'sending_mrr,ending_mrr=starting_mrr+ sum of movements
async def _mrr_waterfall(self, start: date, end: date, spec: QuerySpec | None):
months = pd.date_range(start, end, freq="MS")
baseline = await self._current_mrr(start, spec) # step 1
mm = self.movement_model # MRRMovementCube
q = ( # step 2
mm.measures.amount
+ mm.dimension("movement_type")
+ mm.filter("occurred_at", "between", (start, end))
+ mm.time_grain("occurred_at", "month")
)
# ...execute and index by (month, movement_type)...
waterfall = [] # step 3
ending_mrr = baseline
for month in months:
starting_mrr = ending_mrr
net_change = sum(movements for this month)
ending_mrr = starting_mrr + net_change
waterfall.append({month, starting_mrr, new, expansion,
contraction, churn, reactivation,
net_change, ending_mrr})
return waterfall
The movement query SQL (step 2):
SELECT date_trunc('month', m.occurred_at) AS period,
m.movement_type,
SUM(m.amount_base_cents) AS amount_base
FROM metric_mrr_movement m
WHERE m.occurred_at BETWEEN :start AND :end
GROUP BY period, m.movement_type
Months with no movements appear with all-zero changes and MRR carried forward from the previous month.
Churn (P0)¶
Subscribes to (ingestion mode): subscription.churned, subscription.canceled, subscription.created, subscription.activated, subscription.reactivated
Metrics: Logo churn rate, revenue churn rate, net revenue churn rate
Tables:
-- Tracks which customers are active at any point in time
CREATE TABLE metric_churn_customer_state (
id UUID PRIMARY KEY,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
active_subscriptions INT NOT NULL DEFAULT 0,
first_active_at TIMESTAMPTZ,
churned_at TIMESTAMPTZ,
UNIQUE(source_id, customer_id)
);
-- Churn events for rate calculation. `cancel_reason` is copied from the
-- originating subscription.canceled / subscription.churned payload so the
-- event can be segmented by reason without re-joining the subscription
-- table (sourced from Stripe's `cancellation_details.feedback`).
CREATE TABLE metric_churn_event (
id UUID PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
churn_type TEXT NOT NULL, -- logo | revenue | canceled
cancel_reason TEXT, -- too_expensive | missing_features | …
mrr_cents BIGINT, -- revenue lost (for revenue churn)
occurred_at TIMESTAMPTZ NOT NULL
);
Queries:
Logo churn rate:
-- Customers who churned in period / customers active at period start
SELECT
(SELECT COUNT(*) FROM metric_churn_event
WHERE churn_type = 'logo'
AND occurred_at BETWEEN :start AND :end)::float
/
NULLIF((SELECT COUNT(*) FROM metric_churn_customer_state
WHERE first_active_at < :start
AND (churned_at IS NULL OR churned_at >= :start)), 0)
AS logo_churn_rate
Revenue churn rate (computed in base currency for cross-currency correctness):
SELECT
ABS(SUM(CASE WHEN m.movement_type = 'churn' THEN m.amount_base_cents ELSE 0 END))::float
/
NULLIF((SELECT SUM(mrr_base_cents) FROM metric_mrr_snapshot WHERE snapshot_at < :start), 0)
AS revenue_churn_rate
FROM metric_mrr_movement m
WHERE m.occurred_at BETWEEN :start AND :end
Retention (P0)¶
Subscribes to (ingestion mode): subscription.created, subscription.activated, subscription.churned, subscription.reactivated, customer.created
Tables:
-- Cohort membership (immutable once set)
CREATE TABLE metric_retention_cohort (
id UUID PRIMARY KEY,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
cohort_month DATE NOT NULL, -- month of first subscription
UNIQUE(source_id, customer_id)
);
-- Monthly activity (one row per customer per active month)
CREATE TABLE metric_retention_activity (
id UUID PRIMARY KEY,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
active_month DATE NOT NULL,
UNIQUE(source_id, customer_id, active_month)
);
Query — cohort retention matrix:
SELECT
c.cohort_month,
a.active_month,
COUNT(DISTINCT a.customer_id)::float
/ NULLIF(COUNT(DISTINCT c.customer_id), 0) AS retention_rate,
EXTRACT(MONTH FROM age(a.active_month, c.cohort_month))::int AS months_since
FROM metric_retention_cohort c
LEFT JOIN metric_retention_activity a
ON c.customer_id = a.customer_id AND c.source_id = a.source_id
WHERE c.cohort_month BETWEEN :start AND :end
GROUP BY c.cohort_month, a.active_month
ORDER BY c.cohort_month, a.active_month
Net Revenue Retention (NRR) and Gross Revenue Retention (GRR):
These query the MRR metric's movement table:
NRR = (start_mrr + expansion - contraction - churn) / start_mrr
GRR = (start_mrr - contraction - churn) / start_mrr
LTV (P1)¶
Subscribes to (ingestion mode): invoice.paid
Dependencies: mrr (ARPU queries MRR snapshot), churn (simple LTV uses logo churn rate)
Tables:
-- Append-only log of paid invoices (idempotent via event_id)
CREATE TABLE metric_ltv_invoice (
id UUID PRIMARY KEY,
event_id UUID NOT NULL UNIQUE, -- idempotency key
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
amount_cents BIGINT NOT NULL, -- original currency
amount_base_cents BIGINT NOT NULL, -- base currency at FX rate on paid_at
currency TEXT NOT NULL, -- ISO 4217
paid_at TIMESTAMPTZ NOT NULL
);
Event handling:
async def handle_event(self, event: Event) -> None:
# invoice.paid → insert with ON CONFLICT (event_id) DO NOTHING
# Amount converted to base currency via FX module
Queries:
ARPU (queries via MRRSnapshotCube — the customer_count measure on that cube exists for this purpose):
sm = MRRSnapshotCube
q = (
sm.measures.mrr
+ sm.measures.customer_count
+ sm.where("s.mrr_base_cents", ">", 0)
)
if at:
q = q + sm.filter("snapshot_at", "<=", at)
q = q + await build_spec_fragment(sm, spec, self.db)
SELECT SUM(s.mrr_base_cents) AS mrr,
COUNT(DISTINCT s.customer_id) AS customer_count
FROM metric_mrr_snapshot AS s
WHERE s.mrr_base_cents > 0
Simple LTV: ARPU / logo_churn_rate — delegates to MRR (via MRRSnapshotCube) and Churn metrics.
Cohort LTV uses the same cohort definition as retention — month of a
customer's first new MRR movement — so the denominator matches ARPU
(both count customers with at least one active subscription, MRR > 0).
Trials that never convert are excluded. Computed in two cube queries and
joined in Python:
cohort_by_customerfromMRRMovementCubefiltered tomovement_type = 'new', grouped bycustomer_id+ month (take earliest month per customer).revenue_by_customerfromLtvInvoiceCubefiltered topaid_at BETWEEN :start AND :end, grouped bycustomer_id.- Group by cohort month:
customer_count = |cohort|,total_revenue = SUM(invoices for cohort members).
Cubes:
LtvInvoiceCube— measures:total_revenue,total_revenue_original,invoice_count,customer_count; dimensions:source_id,customer_id,currency,customer_country(via customer join),cohort_month(via retention cohort join, still available but unused for cohort LTV).MRRMovementCube— used by ARPU (atset) and Cohort LTV for movement-history queries.MRRSnapshotCube— used by ARPU (at=None) for current-state reads.
API endpoints: GET /metrics/ltv (simple), GET /metrics/ltv/arpu, GET /metrics/ltv/cohort
Trials (P1)¶
Subscribes to (ingestion mode): subscription.trial_started, subscription.trial_converted, subscription.trial_expired
Trial metrics are cohort-based: a trial is attributed to the period of its started_at, and its converted/expired outcome rolls up to the same cohort regardless of when the outcome event arrives. Late conversions retroactively update the cohort's rate.
Tables:
-- Append-only lifecycle log (audit / idempotent via event_id)
CREATE TABLE metric_trial_event (
id UUID PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
source_id UUID NOT NULL,
customer_id TEXT NOT NULL,
subscription_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- started | converted | expired
occurred_at TIMESTAMPTZ NOT NULL
);
-- One row per trial. Cohort queries aggregate over this table.
CREATE TABLE metric_trial (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
customer_id TEXT NOT NULL,
subscription_id TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
converted_at TIMESTAMPTZ,
expired_at TIMESTAMPTZ,
CONSTRAINT uq_trial_sub UNIQUE (source_id, subscription_id)
);
Event handling:
# subscription.trial_started → upsert started_at (LEAST of existing/new)
# subscription.trial_converted → upsert converted_at (COALESCE — set once)
# subscription.trial_expired → upsert expired_at (COALESCE — set once)
# Each event is also appended to metric_trial_event (ON CONFLICT event_id DO NOTHING).
# Out-of-order events are safe: a converted/expired arriving before started
# inserts a placeholder row; the later started event corrects started_at.
Queries:
Funnel — a single aggregate over metric_trial filtered by started_at:
q = (m.measures.started_count # COUNT(*)
+ m.measures.converted_count # COUNT(converted_at) — non-null
+ m.measures.expired_count # COUNT(expired_at) — non-null
+ m.filter("started_at", "between", (start, end)))
Conversion rate — converted / started from the funnel result.
Series — same aggregate with time_grain("started_at", interval) → one row per cohort month.
Cube: TrialCube — source metric_trial; measures: started_count, converted_count, expired_count, customer_count; dimensions: source_id, customer_country (via customer join); time dimensions: started_at, converted_at, expired_at.
API endpoints: GET /metrics/trials (conversion rate), GET /metrics/trials/series, GET /metrics/trials/funnel
Usage Revenue (P1)¶
Sibling metric to MRR. Reports the raw monthly usage charges as actuals — distinct from MRR's smoothed trailing-3m usage component. No event subscription: the canonical store (metric_mrr_usage_component) is populated by the MRR metric's invoice.paid handler. Usage Revenue declares no new tables and re-aggregates that data via its own Cube. Useful for auditing meter events, reconciling against Stripe invoices, and answering "how much did customers actually pay for usage in month \(m\)" without smoothing.
Cube: UsageRevenueCube — source metric_mrr_usage_component; measures: revenue, revenue_original, subscription_count, customer_count; dimensions: source_id, customer_id, subscription_id, currency, plan/product/customer joins, tenure_months, cohort_month; time dimension: period_start.
Dependencies: mrr — usage_revenue must initialize after MRR so the upstream invoice.paid handler is wired before any query runs.
API endpoints: GET /metrics/usage-revenue (total), GET /metrics/usage-revenue/series, GET /metrics/usage-revenue/by-customer
MetricsEngine¶
The engine dynamically discovers and delegates to metrics. It has no hardcoded metric methods — every metric is reached through the registry.
class MetricsEngine:
def __init__(
self,
db: AsyncSession,
metrics: list[Metric] | None = None,
):
self.db = db
# Auto-discover all @register metrics if none provided
raw = metrics if metrics is not None else discover_metrics()
# Resolve dependency order (topological sort); raises on cycles or missing deps
ordered = resolve_dependencies(raw)
# Initialize in order, injecting resolved instances
# (synchronous — no I/O, just wiring)
self._metrics: dict[str, Metric] = {}
for m in ordered:
deps = {name: self._metrics[name] for name in m.dependencies}
m.init(db=db, deps=deps)
self._metrics[m.name] = m
async def query(
self,
metric: str,
params: dict,
spec: QuerySpec | None = None,
) -> Any:
"""Route a query to the named metric. Works for any registered metric —
built-in or custom — without engine changes.
spec is validated against the metric's Cube inside the metric's query
methods. Invalid dimension/filter names raise ValueError with available
options.
"""
if metric not in self._metrics:
raise KeyError(f"No metric registered for '{metric}'. "
f"Available: {sorted(self._metrics)}")
return await self._metrics[metric].query(params, spec=spec)
def available_metrics(self) -> list[str]:
"""Return names of all registered metrics. Synchronous."""
return sorted(self._metrics)
Same-database mode (Lago / Kill Bill) is reached by passing a DatabaseConnector into the specific metric's constructor or init() — not via the engine. Each metric decides how to dispatch between ingestion-mode reads (SQL against metric_* tables) and direct-query mode (via its connector).
Usage¶
engine = MetricsEngine(db=async_session)
# Any registered metric, any params — always awaited
await engine.query("mrr", {"query_type": "current"})
await engine.query("mrr", {"query_type": "series", "start": date(2025,1,1), "end": date(2025,12,31), "interval": "month"})
await engine.query("churn", {"start": date(2026,1,1), "end": date(2026,2,28), "type": "revenue"})
# With dimensions and filters via QuerySpec
spec = QuerySpec(
dimensions=["plan_interval", "customer_country"],
filters={"customer_country": "US"},
granularity="month",
)
await engine.query("mrr", {"query_type": "current"}, spec=spec)
# Per-currency breakdown (uses original-currency amounts)
await engine.query("mrr", {"query_type": "current"},
spec=QuerySpec(dimensions=["currency"]))
# Synchronous — no I/O
engine.available_metrics()
# ['churn', 'ltv', 'mrr', 'retention', 'trials']
The same engine.query() call is used from FastAPI, CLI, and Jupyter. The HTTP API maps URL path segments to metric names, query parameters to params, and dimension/filter parameters to QuerySpec.
Segmentation (QuerySpec)¶
QuerySpec carries four orthogonal slicing concerns:
- Filters — restrict which rows are included (
filtersdict → WHERE clauses) - Dimensional cuts — split the result by named dimensions (
dimensionslist → GROUP BY) - Segment — universe filter from a saved
SegmentDef(AND'd into every row) - Compare — list of
(segment_id, SegmentDef)pairs producing one tagged row per matching branch (CROSS JOIN VALUES + compound OR)
All four reference dimension names defined in the metric's Cube, or attribute keys / static joins routed through the segment compiler. Every metric's query() calls await build_spec_fragment(cube, spec, self.db) exactly once; that helper folds all four concerns into the QueryFragment.
Available Dimensions¶
Each metric's cube declares which dimensions are available. Common dimensions across metric models:
| Dimension | Column | Join required | Example |
|---|---|---|---|
source_id |
fact table source_id |
none | Multi-source deployments |
currency |
fact table currency |
none | Per-currency amounts (uses *_cents) |
plan_id |
sub.plan_id |
subscription | MRR per plan |
plan_interval |
p.interval |
subscription → plan | Monthly vs. annual |
customer_country |
c.country |
customer | Churn rate by country |
movement_type |
m.movement_type |
none (MRR movement only) | MRR breakdown |
cohort_month |
rc.cohort_month |
none (retention only) | Cohort matrix |
mrr_band / arr_band |
CASE expression on mrr_base_cents |
customer (for c.created_at) |
Bucketed segmentation |
tenure_months |
AGE(now, c.created_at) |
customer | Age cohorts |
Computed dimensions (mrr_band, arr_band, tenure_months, cohort_month on the MRR/Churn cubes; customer_created_month on LtvInvoiceCube) are SQL expressions on Dim.column — they appear in the /fields discovery endpoint alongside regular dimensions.
Multiple dimensions can be combined: dimensions=["plan_interval", "customer_country"] gives MRR for each interval × country combination.
Compare-mode return shape (ratio metrics)¶
For aggregate metrics (MRR, MRR breakdown, MRR series), compare mode returns one row per segment_id with the metric's measures duplicated per branch — straight from the SQL.
For ratio metrics (logo / revenue churn rate, NRR / GRR, ARPU, simple LTV, trial conversion rate), the numerator and denominator are separate sub-queries — each one runs through build_spec_fragment so both pick up the same compare payload, then the metric divides per segment_id in Python and returns a list:
[
{"segment_id": "seg_a", "logo_churn_rate": 0.038, "churn_count": 12, "active_at_start": 320},
{"segment_id": "seg_b", "logo_churn_rate": 0.057, "churn_count": 8, "active_at_start": 140},
]
tidemill/metrics/churn/metric.py is the reference — _logo_churn, _revenue_churn, _active_at_start_count, and _mrr_at_start_per_segment all accept the spec and return per-segment dicts when compare is set. The same pattern is used in LtvMetric._historical_arpu, RetentionMetric._revenue_retention, and TrialsMetric._funnel.
Cohort-matrix caveat. Per-segment retention matrices would be a 4-D result (cohort × active month × customer × segment); the chart can't consume it today. RetentionMetric._cohort_matrix and LtvMetric._cohort_ltv strip compare (via _filter_only(spec)) and treat compare like a plain segment list — the matrix renders the union of customers matching any branch.
Return Shape¶
With dimensions:
# engine.query("mrr", {"query_type": "current"}, spec=QuerySpec(dimensions=["plan_id"]))
[
{"plan_id": "plan_starter", "mrr": 2900.00},
{"plan_id": "plan_professional", "mrr": 6320.00},
{"plan_id": "plan_enterprise", "mrr": 3230.00},
]
# dimensions=["plan_interval", "customer_country"]
[
{"plan_interval": "monthly", "customer_country": "US", "mrr": 4100.00},
{"plan_interval": "monthly", "customer_country": "DE", "mrr": 980.00},
{"plan_interval": "yearly", "customer_country": "US", "mrr": 5200.00},
...
]
Without dimensions: scalar or time-series as usual.
Cubes & Query Algebra¶
All metrics build their SQL through cubes and composable query fragments (tidemill/metrics/query.py). Each metric declares a Cube that defines the available joins, measures, and dimensions for its fact table. Query methods compose immutable QueryFragment objects — each fragment carries column expressions, filters, and required joins. The compiler resolves joins in dependency order and emits a SQLAlchemy Select.
See Cubes & Query Algebra for the full approach: model definitions, fragment algebra, compilation pipeline, and concrete models for all metric tables.
Dependencies¶
The dependencies property declares which other metrics must be initialized before this one. The engine performs a topological sort and injects resolved instances.
@register
class RetentionMetric(Metric):
name = "retention"
dependencies = ["mrr"] # NRR/GRR queries metric_mrr_movement
def _init(self, db, connector, deps):
self.db = db
self.mrr = deps["mrr"] # injected MRR metric instance
async def query(self, params, spec=None):
# Can await self.mrr.query(...) or query metric_mrr_movement directly
...
@register
class LtvMetric(Metric):
name = "ltv"
model = LtvInvoiceCube
dependencies = ["mrr", "churn"] # ARPU reads MRR snapshot; simple LTV uses churn rate
def init(self, *, db, deps):
self.db = db
self.mrr = deps["mrr"]
self.churn = deps["churn"]
async def query(self, params, spec=None):
# ARPU: queries MRRSnapshotCube (mrr + customer_count measures).
# Simple LTV: ARPU / logo_churn_rate — delegates to churn metric.
# Cohort LTV: MRRMovementCube for cohort assignment + LtvInvoiceCube for revenue.
...
Dependency graph for built-in metrics:
mrr ──────────────────► retention (NRR/GRR query MRRSnapshotCube + MRRMovementCube)
└──────────────────► ltv (ARPU queries MRRSnapshotCube.mrr + .customer_count)
churn ─────────────────► ltv (simple LTV = ARPU / logo churn rate)
Trials has no dependencies — it only processes its own events.
Quick Ratio is not a registered metric. It's a post-processing derivation over MRR movements, exposed via tidemill.reports.mrr.quick_ratio and surfaced in the /api/metrics/summary response.