Cubes & Query Algebra¶
Declarative query building: cubes define what's queryable, algebraic fragments compose queries. Last updated: April 2026
Overview¶
Metric queries need to aggregate fact tables (e.g., metric_mrr_snapshot) while slicing and filtering by columns that live in other tables (plans, customers, subscriptions). The naive approach — procedurally building JOINs and WHERE clauses — leads to duplicated join logic across metrics and fragile SQL construction.
This module solves the problem with a hybrid of two patterns:
- Cube — a class that declares all available joins, measures, and dimensions for a fact table. Defines what's queryable in one place.
- Algebraic Fragment Composition — queries are built by combining immutable
QueryFragmentobjects with+. Each fragment carries column expressions, filters, and required joins. Fragments compose freely — order doesn't matter.
The cube owns the contract (available dimensions, join paths, measure definitions). Fragments provide composability (metrics build queries by combining pieces conditionally). The compiler turns a composed fragment into a SQLAlchemy Select.
Design Rationale¶
- Cube solves join resolution once. Pre-defined dimensions mean no ambiguity. The model is the single source of truth for what's queryable per fact table.
- Fragment composition replaces procedural building. Metrics declare what they need, not how to join. Conditional logic is clean:
q = base + (extra if condition else Fragment()). - SQL transparency comes free.
compile()produces both the SQLAlchemySelectand a formatted SQL string. Every metric result can carry its SQL for auditability — the project's core differentiator. - Serializable query specs. The API receives
{"dimensions": [...], "filters": {...}}and the model validates + compiles it. No custom parsing. - Each metric owns its cubes. MRR defines
MRRSnapshotCubeandMRRMovementCube. Churn definesChurnEventCube. No shared global registry — each cube is self-contained.
Core Concepts¶
Cube¶
A Cube is a class that declares everything needed to query a fact table with dimensional slicing. It contains four nested declarations:
- Joins — how to reach dimension tables from the fact table, with dependency ordering
- Measures — named aggregation expressions (
Sum,CountDistinct,Avg,Count) - Dimensions — named columns for GROUP BY, each declaring which join it requires
- TimeDimensions — time columns that support granularity truncation (
DATE_TRUNC)
class MRRSnapshotCube(Cube):
"""Cube for the MRR snapshot fact table."""
__source__ = "metric_mrr_snapshot"
__alias__ = "s"
class Joins:
subscription = Join("subscription", alias="sub",
on="sub.source_id = s.source_id AND sub.external_id = s.subscription_id")
plan = Join("plan", alias="p",
on="p.id = sub.plan_id", depends_on=["subscription"])
product = Join("product", alias="prod",
on="prod.id = p.product_id", depends_on=["plan"])
customer = Join("customer", alias="c",
on="c.source_id = s.source_id AND c.external_id = s.customer_id")
class Measures:
mrr = Sum("s.mrr_base_cents")
mrr_original = Sum("s.mrr_cents") # in original currency
count = CountDistinct("s.subscription_id")
class Dimensions:
source_id = Dim("s.source_id")
currency = Dim("s.currency")
plan_id = Dim("sub.plan_id", join="subscription")
plan_name = Dim("p.name", join="plan")
plan_interval = Dim("p.interval", join="plan")
pricing_model = Dim("p.pricing_model", join="plan") # flat | tiered | volume | usage_based
usage_type = Dim("p.usage_type", join="plan") # licensed | metered
product_name = Dim("prod.name", join="product") # via plan → product
customer_country = Dim("c.country", join="customer")
collection_method = Dim("sub.collection_method", join="subscription")
cancel_at_period_end = Dim("sub.cancel_at_period_end", join="subscription")
class TimeDimensions:
snapshot_at = TimeDim("s.snapshot_at")
Key properties:
- Join dependencies —
plandepends onsubscription,productdepends onplan. Requestingproduct_nameautomatically pulls in all three joins in the correct order. - Lazy joins — only dimensions/filters that are actually used trigger their joins. A query with no plan dimensions never joins the
plantable. - Introspection — the model can list its available dimensions and measures at runtime, enabling API validation and documentation generation.
Plan/product dimensions require plan ingestion
The MRR cubes declare plan_id, plan_name, plan_interval,
pricing_model, usage_type, and product_name, but these join
through the plan (and product) table. The Stripe connector
currently does not emit plan.* / product.* events, so
subscription.plan_id is NULL for Stripe-sourced data and any query
that references those dimensions returns zero rows. They will start
populating once plan/product ingestion lands — the cube declarations
are correct in advance. Same-database connectors (Lago, Kill Bill)
that query the billing engine's own plan tables are not affected.
MRRSnapshotCube.available_dimensions()
# ['cancel_at_period_end', 'collection_method', 'currency',
# 'customer_country', 'plan_id', 'plan_interval', 'plan_name',
# 'pricing_model', 'product_name', 'source_id', 'usage_type']
MRRSnapshotCube.available_measures()
# ['mrr', 'mrr_original', 'count']
MRRSnapshotCube.available_time_dimensions()
# ['snapshot_at']
QueryFragment¶
A QueryFragment is an immutable, composable description of a query piece. It carries what to SELECT, what to filter, and which joins are needed — but no SQL yet.
@dataclass(frozen=True)
class QueryFragment:
"""Immutable query fragment. Fragments compose via + (monoid)."""
source: str | None = None
alias: str | None = None
measures: tuple[MeasureExpr, ...] = ()
dimensions: tuple[DimExpr, ...] = ()
filters: tuple[FilterExpr, ...] = ()
joins: frozenset[str] = frozenset() # join names needed
time_grain: TimeGrainExpr | None = None
def __add__(self, other: QueryFragment) -> QueryFragment:
"""Merge two fragments. Joins are unioned, time_grain uses first non-None."""
return QueryFragment(
source=self.source or other.source,
alias=self.alias or other.alias,
measures=self.measures + other.measures,
dimensions=self.dimensions + other.dimensions,
filters=self.filters + other.filters,
joins=self.joins | other.joins,
time_grain=self.time_grain or other.time_grain,
)
Fragment Algebra¶
Fragments combine via + with these properties:
| Property | Meaning |
|---|---|
| Associative | (a + b) + c == a + (b + c) — grouping doesn't affect result |
| Commutative | a + b == b + a — order of composition doesn't matter |
| Identity | QueryFragment() + x == x — empty fragment changes nothing |
This makes fragments a commutative monoid, which means:
- You can store fragments in lists and
reduce()them - Conditional inclusion is trivial:
base + (f if condition else QueryFragment()) - Order of adding dimensions/filters/measures never matters
- Fragments are independently testable
Fragment Constructors¶
The cube provides factory methods that return fragments. Each method encapsulates the column expression and the joins required to resolve it:
# Measure fragments — carry the aggregation expression
model.measures.mrr
# → QueryFragment(source="metric_mrr_snapshot", alias="s",
# measures=(MeasureExpr("SUM", "s.mrr_base_cents", label="mrr"),))
# Dimension fragments — carry column expression + required joins
model.dimension("plan_interval")
# → QueryFragment(dimensions=(DimExpr("p.interval", label="plan_interval"),),
# joins=frozenset({"subscription", "plan"}))
model.dimension("customer_country")
# → QueryFragment(dimensions=(DimExpr("c.country", label="customer_country"),),
# joins=frozenset({"customer"}))
# Filter fragments — carry predicate + required joins
model.filter("plan_interval", "=", "monthly")
# → QueryFragment(filters=(FilterExpr("p.interval", "=", "monthly"),),
# joins=frozenset({"subscription", "plan"}))
model.filter("customer_country", "in", ["US", "DE"])
# → QueryFragment(filters=(FilterExpr("c.country", "in", ["US", "DE"]),),
# joins=frozenset({"customer"}))
# Time grain fragment — carries DATE_TRUNC expression
model.time_grain("snapshot_at", "month")
# → QueryFragment(time_grain=TimeGrainExpr("s.snapshot_at", "month"))
# Raw filter on the source table (no join needed)
model.where("s.mrr_base_cents", ">", 0)
# → QueryFragment(filters=(FilterExpr("s.mrr_base_cents", ">", 0),))
Compilation¶
compile() turns a composed QueryFragment into a SQLAlchemy Select statement and a bind-params dict. The compilation pipeline:
- FROM —
SELECT ... FROM {source} AS {alias} - Resolve static joins — collect all join names (self + compare branches), topologically sort by
depends_on, emitJOINclauses. Deduplication is automatic (joins are afrozenset). - Apply dynamic joins — emit one
LEFT JOINper uniqueDynamicJoinExpralias. Used byCube.attribute()to reach EAV attribute tables without declaring every attribute up-front. Always LEFT sois_empty/IS NULLsemantics stay correct. - Compare mode — if
fragment.compareis non-empty, addCROSS JOIN (VALUES ('seg_a'), ('seg_b')) AS seg(segment_id)and prependseg.segment_idto SELECT and GROUP BY. - Add time grain —
DATE_TRUNC(granularity, column) AS periodin SELECT and GROUP BY (soperiodleads the projection). - Add dimensions — column expressions in SELECT and GROUP BY.
- Add measures — aggregation expressions in SELECT (
SUM(...),COUNT(DISTINCT ...),COUNT(...),AVG(...)). - Add filters — WHERE clauses with bound parameters. Non-compare filters AND together; when compare is set, an additional compound OR-of-ANDs is AND'd in (each branch tagged with
seg.segment_id = :branch_id).
There is no ORDER BY in the algebra — ordering is handled by the caller or by post-processing in the metric. Also note: aggregates are emitted as raw SUM(col) / COUNT(DISTINCT col) — no unit conversion happens in the compiler (cents-to-dollars is the caller's responsibility).
A to_sql(model) helper compiles the statement against the PostgreSQL dialect and inline-substitutes bind parameters, which is useful for notebooks and SQL logging.
Dynamic joins + attribute()¶
Cube.attribute(key, op, value, attr_type="string") produces a QueryFragment that adds a DynamicJoinExpr(alias="ca_{key}", table="customer_attribute", depends_on=("customer",)) and a filter against the matching value_* column (value_string, value_number, value_bool, value_timestamp). Two filters on the same key share one join — aliases are deduped by alias in __add__ and again at compile time. See segments.md for the full attribute EAV model.
OR groups via Cube.or_group()¶
Cube.or_group([frag_a, frag_b, …]) combines fragments with OR — each leg's filters AND together internally, legs are OR'd at the top. Implemented as a FilterExpr(kind="or", children=(...)) variant that _filter_clause handles uniformly with simple filters — no new compilation path.
Operator set¶
FilterExpr supports: = != > >= < <= in not in between contains not_contains starts_with ends_with is_empty is_not_empty. All map cleanly to SQLAlchemy Core — ILIKE for the string ops (with % / _ / \\ escaped in the bound value), IS NULL / IS NOT NULL for the empty ops (no bind param emitted). No raw text() beyond what static join declarations already use.
Concrete Cubes¶
These are the cubes for the project's metric tables. Each maps a fact table to its available joins, measures, and dimensions. Every cube lives next to its metric (e.g., tidemill/metrics/mrr/cubes.py, tidemill/metrics/churn/cubes.py).
Dimensions sourced from Stripe API objects: Customer (name, address.country), Subscription (status, collection_method, cancel_at_period_end, cancellation_details), Price/Plan (interval, interval_count, billing_scheme → canonical pricing_model, usage_type), Product (name). See Stripe API mapping below for the full field comparison.
MRR Snapshot Cube¶
class MRRSnapshotCube(Cube):
"""Current MRR per subscription. Updated on every subscription event."""
__source__ = "metric_mrr_snapshot"
__alias__ = "s"
class Joins:
subscription = Join("subscription", alias="sub",
on="sub.source_id = s.source_id AND sub.external_id = s.subscription_id")
plan = Join("plan", alias="p",
on="p.id = sub.plan_id", depends_on=["subscription"])
product = Join("product", alias="prod",
on="prod.id = p.product_id", depends_on=["plan"])
customer = Join("customer", alias="c",
on="c.source_id = s.source_id AND c.external_id = s.customer_id")
class Measures:
mrr = Sum("s.mrr_base_cents", label="mrr")
mrr_original = Sum("s.mrr_cents", label="mrr_original")
count = CountDistinct("s.subscription_id", label="subscription_count")
customer_count = CountDistinct("s.customer_id", label="customer_count")
class Dimensions:
source_id = Dim("s.source_id")
currency = Dim("s.currency")
# Plan (subscription → plan)
plan_id = Dim("sub.plan_id", join="subscription")
plan_name = Dim("p.name", join="plan", label="plan_name")
plan_interval = Dim("p.interval", join="plan", label="plan_interval")
pricing_model = Dim("p.pricing_model", join="plan") # flat | tiered | volume | usage_based
usage_type = Dim("p.usage_type", join="plan") # licensed | metered
# Product (subscription → plan → product)
product_name = Dim("prod.name", join="product", label="product_name")
# Customer
customer_name = Dim("c.name", join="customer", label="customer_name")
customer_country = Dim("c.country", join="customer", label="customer_country")
# Subscription attributes
collection_method = Dim("sub.collection_method", join="subscription")
cancel_at_period_end = Dim("sub.cancel_at_period_end", join="subscription")
# Computed — CASE / DATE_TRUNC expressions on c.created_at and the
# MRR column. Surfaced through the same /fields endpoint as regular
# dims; segment definitions reach them via the `computed.<name>`
# field prefix (or bare name for backwards compatibility).
mrr_band = Dim(_mrr_band_sql("s.mrr_base_cents"), join="customer", label="MRR band")
arr_band = Dim(_arr_band_sql("s.mrr_base_cents"), join="customer", label="ARR band")
tenure_months = Dim(_TENURE_MONTHS_SQL, join="customer", label="Tenure (months)")
cohort_month = Dim(_COHORT_MONTH_SQL, join="customer", label="Cohort month")
class TimeDimensions:
snapshot_at = TimeDim("s.snapshot_at")
The customer_count measure exists so LTV/ARPU queries can compute SUM(mrr) / COUNT(DISTINCT customer_id) in a single trip to the DB.
Computed dimensions (mrr_band, arr_band, tenure_months, cohort_month) are SQL expressions stored verbatim in Dim.column. literal_column accepts arbitrary expressions, so dimension("mrr_band") and filter("mrr_band", "=", "$100-$500") work the same as any column-backed dim. The expressions live in tidemill/metrics/mrr/cubes.py (_mrr_band_sql, _arr_band_sql, _TENURE_MONTHS_SQL, _COHORT_MONTH_SQL) and are reused across the Churn, LTV, Retention, and Trials cubes wherever a customer join is available. See Segmentation for how segment definitions reach them.
MRR Movement Cube¶
class MRRMovementCube(Cube):
"""Append-only log of MRR changes. Used for breakdown and time-series queries."""
__source__ = "metric_mrr_movement"
__alias__ = "m"
class Joins:
subscription = Join("subscription", alias="sub",
on="sub.source_id = m.source_id AND sub.external_id = m.subscription_id")
plan = Join("plan", alias="p",
on="p.id = sub.plan_id", depends_on=["subscription"])
product = Join("product", alias="prod",
on="prod.id = p.product_id", depends_on=["plan"])
customer = Join("customer", alias="c",
on="c.source_id = m.source_id AND c.external_id = m.customer_id")
class Measures:
amount = Sum("m.amount_base_cents", label="amount_base")
amount_original = Sum("m.amount_cents", label="amount_original")
count = CountDistinct("m.event_id", label="event_count")
class Dimensions:
source_id = Dim("m.source_id")
customer_id = Dim("m.customer_id")
currency = Dim("m.currency")
movement_type = Dim("m.movement_type")
plan_id = Dim("sub.plan_id", join="subscription")
plan_name = Dim("p.name", join="plan", label="plan_name")
plan_interval = Dim("p.interval", join="plan", label="plan_interval")
pricing_model = Dim("p.pricing_model", join="plan")
usage_type = Dim("p.usage_type", join="plan")
product_name = Dim("prod.name", join="product", label="product_name")
customer_name = Dim("c.name", join="customer", label="customer_name")
customer_country = Dim("c.country", join="customer", label="customer_country")
collection_method = Dim("sub.collection_method", join="subscription")
class TimeDimensions:
occurred_at = TimeDim("m.occurred_at")
Churn Customer State Cube¶
class ChurnCustomerStateCube(Cube):
"""Active/churned state per customer. Updated by subscription events."""
__source__ = "metric_churn_customer_state"
__alias__ = "cs"
class Joins:
customer = Join("customer", alias="c",
on="c.source_id = cs.source_id AND c.external_id = cs.customer_id")
class Measures:
count = CountDistinct("cs.customer_id", label="customer_count")
class Dimensions:
source_id = Dim("cs.source_id")
customer_id = Dim("cs.customer_id")
customer_name = Dim("c.name", join="customer", label="customer_name")
customer_country = Dim("c.country", join="customer", label="customer_country")
class TimeDimensions:
first_active_at = TimeDim("cs.first_active_at")
churned_at = TimeDim("cs.churned_at")
Churn Event Cube¶
class ChurnEventCube(Cube):
"""Individual churn events for rate calculation."""
__source__ = "metric_churn_event"
__alias__ = "ce"
class Joins:
customer = Join("customer", alias="c",
on="c.source_id = ce.source_id AND c.external_id = ce.customer_id")
customer_state = Join("metric_churn_customer_state", alias="cs",
on="cs.source_id = ce.source_id AND cs.customer_id = ce.customer_id")
class Measures:
count = Count("*", label="churn_count")
revenue_lost = Sum("ce.mrr_cents", label="revenue_lost")
class Dimensions:
source_id = Dim("ce.source_id")
customer_id = Dim("ce.customer_id")
churn_type = Dim("ce.churn_type")
cancel_reason = Dim("ce.cancel_reason") # from cancellation_details.feedback; populated on logo, revenue, and canceled rows
customer_name = Dim("c.name", join="customer", label="customer_name")
customer_country = Dim("c.country", join="customer", label="customer_country")
# Scopes churn events to customers active at period start
customer_first_active = Dim("cs.first_active_at", join="customer_state",
label="customer_first_active")
class TimeDimensions:
occurred_at = TimeDim("ce.occurred_at")
The customer_state join plus the customer_first_active dimension let the churn metric restrict events to customers who were already active before the measurement window — essential for correct logo-churn denominators.
Retention Cohort Cube¶
class RetentionCohortCube(Cube):
"""Cohort membership and monthly activity for retention analysis."""
__source__ = "metric_retention_cohort"
__alias__ = "rc"
class Joins:
activity = Join("metric_retention_activity", alias="ra",
on="ra.customer_id = rc.customer_id AND ra.source_id = rc.source_id")
customer = Join("customer", alias="c",
on="c.source_id = rc.source_id AND c.external_id = rc.customer_id")
class Measures:
cohort_size = CountDistinct("rc.customer_id", label="cohort_size")
active_count = CountDistinct("ra.customer_id", label="active_count")
class Dimensions:
source_id = Dim("rc.source_id")
customer_id = Dim("rc.customer_id")
cohort_month = Dim("rc.cohort_month")
active_month = Dim("ra.active_month", join="activity")
customer_country = Dim("c.country", join="customer", label="customer_country")
class TimeDimensions:
cohort_month_time = TimeDim("rc.cohort_month")
LTV Invoice Cube¶
class LtvInvoiceCube(Cube):
"""Append-only log of paid invoices. Used for cohort LTV and revenue tracking."""
__source__ = "metric_ltv_invoice"
__alias__ = "li"
class Joins:
customer = Join("customer", alias="c",
on="c.source_id = li.source_id AND c.external_id = li.customer_id")
cohort = Join("metric_retention_cohort", alias="rc",
on="rc.source_id = li.source_id AND rc.customer_id = li.customer_id")
class Measures:
total_revenue = Sum("li.amount_base_cents", label="total_revenue")
total_revenue_original = Sum("li.amount_cents", label="total_revenue_original")
invoice_count = Count("*", label="invoice_count")
customer_count = CountDistinct("li.customer_id", label="customer_count")
class Dimensions:
source_id = Dim("li.source_id")
customer_id = Dim("li.customer_id")
currency = Dim("li.currency")
customer_country = Dim("c.country", join="customer", label="customer_country")
cohort_month = Dim("rc.cohort_month", join="cohort", label="cohort_month")
class TimeDimensions:
paid_at = TimeDim("li.paid_at")
The cohort join reaches metric_retention_cohort — the same table used by the retention metric — so LTV and retention share cohort definitions. Cohort LTV currently computes cohort membership in Python (movement_type = 'new' in MRRMovementCube) for flexibility; the cohort_month dimension is available but unused by the default cohort-LTV query.
Trial Cube¶
class TrialCube(Cube):
"""Per-trial outcomes. Cohort = month of started_at."""
__source__ = "metric_trial"
__alias__ = "t"
class Joins:
customer = Join("customer", alias="c",
on="c.source_id = t.source_id AND c.external_id = t.customer_id")
class Measures:
# count(*) counts all rows; count(col) counts non-null values.
# Every row has started_at, so started_count == count(*).
started_count = Count("*", label="started")
converted_count = Count("t.converted_at", label="converted")
expired_count = Count("t.expired_at", label="expired")
customer_count = CountDistinct("t.customer_id", label="customer_count")
class Dimensions:
source_id = Dim("t.source_id")
customer_country = Dim("c.country", join="customer", label="customer_country")
class TimeDimensions:
started_at = TimeDim("t.started_at")
converted_at = TimeDim("t.converted_at")
expired_at = TimeDim("t.expired_at")
Trial metrics are cohort-based: the funnel measures (started, converted, expired) are derived from COUNT(*) and COUNT(<timestamp>) over the same row set, filtered by started_at. A trial that starts in January and converts in March is attributed to the January cohort regardless of when the conversion lands.
Stripe API Mapping¶
The table below maps Stripe API fields to our schema columns and cube dimensions. Fields marked with dim are exposed as queryable dimensions in the cubes above.
Customer → customer table¶
| Stripe field | Our column | Cube dimension | Notes |
|---|---|---|---|
id |
external_id |
— | Stripe customer ID |
name |
name |
— | |
email |
email |
— | |
address.country |
country |
customer_country |
ISO 3166-1 alpha-2 |
currency |
currency |
currency |
Default billing currency |
created |
created_at |
— | Cohort assignment |
metadata |
metadata |
— | Custom business dimensions (JSONB) |
delinquent |
— | — | Could add for payment health segmentation |
address.state/city |
— | — | Sub-national geo; use metadata if needed |
Product → product table (new)¶
| Stripe field | Our column | Cube dimension | Notes |
|---|---|---|---|
id |
external_id |
— | Stripe product ID |
name |
name |
product_name |
Product-level MRR/churn analysis |
active |
active |
— | |
metadata |
metadata |
— | Product line, category |
Price/Plan → plan table¶
| Stripe field | Our column | Cube dimension | Notes |
|---|---|---|---|
id |
external_id |
plan_id |
Stripe price/plan ID |
product |
product_id |
(via product join) | FK to product table |
nickname / product.name |
name |
plan_name |
Human-readable plan name |
recurring.interval |
interval |
plan_interval |
month, year, week, day |
recurring.interval_count |
interval_count |
— | 1=monthly, 3=quarterly, 12=annual |
unit_amount |
amount_cents |
— | Per-unit price |
currency |
currency |
— | |
billing_scheme |
pricing_model |
pricing_model |
Stripe per_unit/tiered → canonical flat/tiered/volume/usage_based |
recurring.usage_type |
usage_type |
usage_type |
licensed, metered |
type |
— | — | one_time vs recurring; filter at ingest |
trial_period_days |
trial_period_days |
— | |
tiers |
— | — | Tiered pricing detail; store in metadata |
metadata |
metadata |
— | Custom plan attributes |
Subscription → subscription table¶
| Stripe field | Our column | Cube dimension | Notes |
|---|---|---|---|
id |
external_id |
— | Stripe subscription ID |
status |
status |
— | active, trialing, past_due, canceled, etc. |
items[].price × quantity |
mrr_cents / mrr_base_cents |
(measure) | Computed at ingest time |
items[].quantity |
quantity |
— | Seat/unit count |
currency |
currency |
— | |
collection_method |
collection_method |
collection_method |
charge_automatically, send_invoice |
cancel_at_period_end |
cancel_at_period_end |
cancel_at_period_end |
Churn early warning |
cancellation_details.reason |
cancel_reason |
— | customer, payment_failure, etc. |
cancellation_details.feedback |
cancel_feedback |
— | too_expensive, missing_features, etc. |
start_date |
started_at |
— | |
trial_start / trial_end |
trial_start / trial_end |
— | |
canceled_at |
canceled_at |
— | |
ended_at |
ended_at |
— | |
current_period_start/end |
current_period_start/end |
— | |
metadata |
— | — | Store in event payload |
Churn Events → metric_churn_event table¶
| Stripe field | Our column | Cube dimension | Notes |
|---|---|---|---|
cancellation_details.reason |
cancel_reason |
cancel_reason |
Denormalized from subscription for direct churn analysis |
Not captured (available via metadata or future extension)¶
| Stripe field | Analytics value | Priority |
|---|---|---|
charge.payment_method_details.card.brand |
Payment success by card brand | P1 |
charge.payment_method_details.card.country |
Card issuing country vs billing country | P1 |
charge.outcome.risk_level |
Fraud/risk segmentation | P2 |
invoice.billing_reason |
Distinguish cycle vs proration vs manual | P1 |
invoice.amount_paid / amount_remaining |
AR aging, collection rate | P1 |
invoice.total_discount_amounts |
Discount impact on revenue | P2 |
customer.delinquent |
Payment health flag | P2 |
price.tiers / tiers_mode |
Tiered pricing analysis | P2 |
Usage in Metrics¶
Metrics declare which cube they use. The query() method composes fragments from the model based on the incoming QuerySpec:
@register
class MrrMetric(Metric):
model = MRRSnapshotCube
movement_model = MRRMovementCube
async def query(self, params: dict, spec: QuerySpec | None = None) -> Any:
match params.get("query_type"):
case "current":
return await self._current_mrr(params.get("at"), spec)
case "series":
return await self._mrr_series(
params["start"], params["end"], params["interval"], spec)
case "breakdown":
return await self._mrr_breakdown(params["start"], params["end"], spec)
async def _current_mrr(self, at: date | None, spec: QuerySpec | None):
# Choose original-currency measure when caller groups by currency
use_original = spec and "currency" in (spec.dimensions or [])
m = self.model
measure = m.measures.mrr_original if use_original else m.measures.mrr
# Base: always-present fragments
q = measure + m.where("s.mrr_base_cents", ">", 0)
# Time filter
if at:
q = q + m.filter("snapshot_at", "<=", at)
# Dimensions + filters + segment + compare from the spec, in one
# bridge call (see tidemill/segments/compiler.py).
q = q + await build_spec_fragment(m, spec, self.db)
stmt, params = q.compile(m)
result = await self.db.execute(stmt, params)
rows = result.mappings().all()
has_compare = bool(spec and spec.compare)
if not spec or (not spec.dimensions and not has_compare):
return rows[0]["mrr"] if rows else 0
return [dict(r) for r in rows]
async def _mrr_breakdown(self, start: date, end: date, spec: QuerySpec | None):
mm = self.movement_model
q = (
mm.measures.amount
+ mm.dimension("movement_type")
+ mm.filter("occurred_at", "between", (start, end))
)
q = q + await build_spec_fragment(mm, spec, self.db)
stmt, params = q.compile(mm)
result = await self.db.execute(stmt, params)
return [dict(r) for r in result.mappings().all()]
Reusable Fragments¶
Common filter combinations can be stored as named fragments and reused across metrics:
# Shared fragment: active subscriptions only
ACTIVE_ONLY = MRRSnapshotCube.where("s.mrr_base_cents", ">", 0)
# Shared fragment: monthly plans
MONTHLY_PLANS = MRRSnapshotCube.filter("plan_interval", "=", "monthly")
# Compose freely
q = MRRSnapshotCube.measures.mrr + ACTIVE_ONLY + MONTHLY_PLANS
QuickRatio Example¶
The QuickRatio metric composes fragments from the MRR movement model:
@register
class QuickRatioMetric(Metric):
model = MRRMovementCube
dependencies = ["mrr"]
async def query(self, params: dict, spec: QuerySpec | None = None) -> Any:
m = self.model
q = (
m.measures.amount
+ m.dimension("movement_type")
+ m.filter("occurred_at", "between", (params["start"], params["end"]))
)
q = q + await build_spec_fragment(m, spec, self.db)
stmt, bind = q.compile(m)
rows = (await self.db.execute(stmt, bind)).mappings().all()
by_type = {r["movement_type"]: r["amount_base"] for r in rows}
growth = sum(by_type.get(t, 0) for t in ("new", "expansion", "reactivation"))
loss = abs(sum(by_type.get(t, 0) for t in ("churn", "contraction")))
return growth / loss if loss else None
QuerySpec¶
QuerySpec is the external API contract — what FastAPI/CLI users pass in. It references dimension and filter names from the metric's cube:
@dataclass
class QuerySpec:
"""Declarative query specification. Validated against the metric's Cube."""
# Dimensions to group by (names from model.Dimensions)
dimensions: list[str] = field(default_factory=list)
# Filters: dimension_name → value (equality) or {op: value}
filters: dict[str, Any] = field(default_factory=dict)
# Time bucketing
granularity: str | None = None # day | week | month | quarter | year
time_range: tuple[str, str] | None = None
The model's apply_spec() method translates the dimensions / filters / granularity fields of a QuerySpec into a composed fragment. Validation happens inside Cube.dimension(), Cube.filter(), and Cube.time_grain() — each raises ValueError with the list of available names when given an unknown one.
@classmethod
def apply_spec(cls, spec: QuerySpec | None) -> QueryFragment:
"""Translate a QuerySpec into a composed QueryFragment."""
if spec is None:
return QueryFragment()
result = QueryFragment()
for dim_name in spec.dimensions or []:
result = result + cls.dimension(dim_name)
for field_name, value in (spec.filters or {}).items():
if isinstance(value, dict):
op = next(iter(value))
result = result + cls.filter(field_name, op, value[op])
else:
result = result + cls.filter(field_name, "=", value)
if spec.granularity:
time_dims = sorted(cls._time_dimensions)
if time_dims:
result = result + cls.time_grain(time_dims[0], spec.granularity)
return result
apply_spec only handles the dim / filter / granularity layer. Metric query methods should call await build_spec_fragment(cube, spec, db) (in tidemill.segments.compiler) — that helper layers apply_spec plus the segment (universe filter) and compare (per-branch CROSS JOIN) carried by the spec, looking up attribute types from the registry as needed. Calling apply_spec directly skips segments and compare-mode entirely.
API Integration¶
The REST API receives a JSON query spec and delegates to the engine:
POST /api/metrics/mrr
{
"query_type": "current",
"dimensions": ["plan_interval", "customer_country"],
"filters": {"customer_country": "US"},
"granularity": "month"
}
FastAPI translates this to:
spec = QuerySpec(
dimensions=["plan_interval", "customer_country"],
filters={"customer_country": "US"},
granularity="month",
)
result = await engine.query("mrr", {"query_type": "current"}, spec=spec)
The model validates dimension/filter names before compilation. Invalid names produce clear error messages:
ValueError: Unknown dimension 'plan_name'. Available: ['currency', 'customer_country', 'plan_id', 'plan_interval', 'source_id']
SQL Examples¶
Money values are stored as cents (integer); the compiler does not divide — cents-to-dollars conversion is the caller's responsibility.
No spec — plain aggregate¶
q = model.measures.mrr + model.where("s.mrr_base_cents", ">", 0)
SELECT SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
WHERE s.mrr_base_cents > 0
Filter only (no dimensions)¶
q = model.measures.mrr + model.where("s.mrr_base_cents", ">", 0) + model.filter("plan_interval", "=", "yearly")
SELECT SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id
AND sub.external_id = s.subscription_id
JOIN plan p ON p.id = sub.plan_id
WHERE s.mrr_base_cents > 0
AND p.interval = :plan_interval
Dimensional cut¶
q = (model.measures.mrr + model.where("s.mrr_base_cents", ">", 0)
+ model.dimension("plan_interval") + model.dimension("customer_country"))
SELECT p.interval AS plan_interval,
c.country AS customer_country,
SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id
AND sub.external_id = s.subscription_id
JOIN plan p ON p.id = sub.plan_id
JOIN customer c ON c.source_id = s.source_id
AND c.external_id = s.customer_id
WHERE s.mrr_base_cents > 0
GROUP BY p.interval, c.country
Filter + dimension + time grain¶
q = (model.measures.mrr + model.where("s.mrr_base_cents", ">", 0)
+ model.filter("customer_country", "in", ["US", "DE"])
+ model.dimension("plan_id")
+ model.time_grain("snapshot_at", "month"))
SELECT DATE_TRUNC('month', s.snapshot_at) AS period,
sub.plan_id,
SUM(s.mrr_base_cents) AS mrr
FROM metric_mrr_snapshot s
JOIN subscription sub ON sub.source_id = s.source_id
AND sub.external_id = s.subscription_id
JOIN customer c ON c.source_id = s.source_id
AND c.external_id = s.customer_id
WHERE s.mrr_base_cents > 0
AND c.country IN :customer_country
GROUP BY DATE_TRUNC('month', s.snapshot_at), sub.plan_id
MRR breakdown by movement type + plan¶
mm = MRRMovementCube
q = (mm.measures.amount + mm.dimension("movement_type") + mm.dimension("plan_id")
+ mm.filter("occurred_at", "between", (start, end)))
SELECT m.movement_type,
sub.plan_id,
SUM(m.amount_base_cents) AS amount_base
FROM metric_mrr_movement m
JOIN subscription sub ON sub.source_id = m.source_id
AND sub.external_id = m.subscription_id
WHERE m.occurred_at BETWEEN :occurred_at_start AND :occurred_at_end
GROUP BY m.movement_type, sub.plan_id
between binds two parameters using the dimension name as the prefix (here occurred_at_start / occurred_at_end). Equality and in filters use :<dimension_name>; other raw where(...) calls derive a parameter name from the column and operator (e.g. s_mrr_base_cents_gt).
Date-range semantics¶
All date ranges are closed-closed [start, end] — both endpoints are inclusive (see docs/definitions.md#date-range-convention). When a bare date is passed as a filter value, the filter layer coerces it to the appropriate day boundary so SQL BETWEEN captures the full calendar day against a TIMESTAMPTZ column:
between (start, end)→startis pinned to00:00:00.000000,endto23:59:59.999999<=upper bound → coerced to end-of-day (inclusive)<,>,>=— no coercion; the bare date sits at00:00:00and behaves as the half-open boundary you'd expect
So between (date(2025, 7, 1), date(2025, 9, 30)) covers every event from Jul 1 midnight through the last microsecond of Sep 30.
Implementation Notes¶
File Location¶
The cube machinery lives in tidemill/metrics/query.py — the same file that metrics import from. The concrete cubes are defined alongside their metrics (e.g., MRRSnapshotCube in tidemill/metrics/mrr/cubes.py).
Package Structure¶
tidemill/metrics/
├── __init__.py # re-exports Metric, QuerySpec, register, ...
├── base.py # Metric ABC + QuerySpec (includes `segment`, `compare`)
├── query.py # Cube, QueryFragment (incl. dynamic_joins + compare), compilation
├── registry.py # @register, discovery, dependency resolution, primary_cube lookup
├── route_helpers.py # parse_spec (now also reads ?segment / ?compare_segments),
│ query_metric (resolves segment IDs → SegmentDefs)
├── mrr/ # MRRSnapshotCube, MRRMovementCube + computed dims
├── churn/ # ChurnCustomerStateCube, ChurnEventCube
├── retention/ # RetentionCohortCube
├── ltv/ # LtvInvoiceCube
└── trials/ # TrialCube
tidemill/segments/
├── model.py # SegmentDef AST (Group, Condition), Segment.to_fragment, Compare
├── compiler.py # build_spec_fragment — bridges QuerySpec (incl. segment/compare) → Cube
└── routes.py # /api/segments CRUD + /validate
tidemill/attributes/
├── ingest.py # Stripe metadata fan-out, type inference, upsert helpers
├── registry.py # attribute_definition reads, distinct values for autocomplete
└── routes.py # /api/attributes + /api/customers/{id}/attributes
Each metric subpackage contains cubes.py, metric.py, routes.py, tables.py, and __init__.py. Metrics call await build_spec_fragment(cube, spec, db) in their query() methods to apply the full QuerySpec (dim / filter / granularity / segment / compare) in one shot.
SQLAlchemy Integration¶
All compilation targets SQLAlchemy Core's Select object — no string concatenation. The compiled statement is executed via AsyncSession.execute(stmt, params), consistent with the project's async-everywhere convention.
Adding a New Dimension¶
To make a new column queryable:
- Add the join (if the table isn't already reachable) to the model's
Joinsclass - Add a
Dim(...)entry to the model'sDimensionsclass, referencing the join - Done — the dimension is immediately available in
QuerySpecand the API
# Example: add customer segment dimension to MRR
class Dimensions:
...
customer_segment = Dim("c.segment", join="customer")
No changes to compilation, API, or other metrics needed.
Adding a Computed Dimension¶
A computed dimension is a SQL expression rather than a single column reference. The compiler treats it the same as a column-backed dim — literal_column accepts arbitrary expressions, so dimension(...), filter(...), and the computed.<name> segment field path all work without changes.
# Example: bucket customers by tenure
_TENURE_BUCKET_SQL = (
"CASE"
" WHEN AGE(NOW(), c.created_at) < INTERVAL '30 days' THEN '<30d'"
" WHEN AGE(NOW(), c.created_at) < INTERVAL '180 days' THEN '30d-6mo'"
" ELSE '>6mo'"
" END"
)
class Dimensions:
...
tenure_bucket = Dim(_TENURE_BUCKET_SQL, join="customer", label="Tenure bucket")
The /api/metrics/{name}/fields endpoint surfaces computed dims alongside column-backed ones — the FE picker treats them identically. The _infer_dim_type helper in tidemill/api/routers/metrics.py reads ::date, ::int, CASE WHEN, etc. from the expression to pick a sensible widget type (date / number / string).