sync/metrichub.py (139 lines of code) (raw):
import re
from datahub.emitter.mce_builder import make_term_urn
from dataclasses import dataclass
from typing import List, Optional
import sqlglot as sqlglot
from metric_config_parser.config import ConfigCollection
from metric_config_parser.metric import MetricLevel
METRIC_HUB_REPO_URL = "https://github.com/mozilla/metric-hub"
LOOKER_METRICS_URL = "https://github.com/mozilla/metric-hub/tree/main/looker"
@dataclass
class MetricStatistic:
"""
Defines and aggregation of a metric across a specific population.
Metric-hub allows users to specify statistics on top of metrics.
Statistics summarize the distribution of metrics within a specific
time frame and population segment. These statistics allow for basic
analyses of a metric and are represented in Looker as measures.
"""
name: str
@property
def title_cased_name(self) -> str:
return self.name.replace("_", " ").title()
@dataclass
class MetricHubDefinition:
name: str
description: str
sql_definition: str
product: str
owners: Optional[List[str]]
level: Optional[MetricLevel]
bigquery_tables: Optional[List[str]]
data_source: Optional[str]
statistics: Optional[List[MetricStatistic]]
friendly_name: Optional[str]
deprecated: bool = False
@property
def display_name(self) -> str:
metric_name = self.name
if self.deprecated:
metric_name += " ⚠️"
if self.level:
if self.level == MetricLevel.GOLD:
metric_name += " 🥇"
elif self.level == MetricLevel.SILVER:
metric_name += " 🥈"
elif self.level == MetricLevel.BRONZE:
metric_name += " 🥉"
return metric_name
@property
def urn(self) -> str:
return f"{make_term_urn(f'Metric Hub.{self.product}.{self.name}')}"
@property
def title_cased_name(self) -> str:
return self.name.replace("_", " ").title()
def _raw_table_name(table: sqlglot.exp.Table) -> str:
"""
Adapted from bigquery-etl:
https://github.com/mozilla/bigquery-etl/blob/12c27464b1d5c41f15a6d3d9e2463547164e3518/bigquery_etl/dependency.py#L21
"""
return (
table.sql("bigquery", comments=False)
.split(" AS ", 1)[0] # remove alias
.replace("`", "") # remove quotes
)
def _extract_table_references(sql: str) -> List[str]:
"""
Return a list of tables referenced in the given SQL. Adapted from bigquery-etl:
https://github.com/mozilla/bigquery-etl/blob/12c27464b1d5c41f15a6d3d9e2463547164e3518/bigquery_etl/dependency.py#L31
"""
# sqlglot cannot handle scripts with variables and control statements
if re.search(r"^\s*DECLARE\b", sql, flags=re.MULTILINE):
return []
# sqlglot parses UDFs with keyword names incorrectly:
# https://github.com/tobymao/sqlglot/issues/1535
sql = re.sub(
r"\.(range|true|false|null)\(",
r".\1_(",
sql,
flags=re.IGNORECASE,
)
# sqlglot doesn't suppport OPTIONS on UDFs
sql = re.sub(
r"""OPTIONS\s*\(("([^"]|\\")*"|'([^']|\\')*'|[^)])*\)""",
"",
sql,
flags=re.MULTILINE | re.IGNORECASE,
)
# sqlglot doesn't fully support byte strings
sql = re.sub(
r"""b(["'])""",
r"\1",
sql,
flags=re.IGNORECASE,
)
query_statements = sqlglot.parse(sql, read="bigquery")
# If there's only one statement, and it's a Column token, it's the table name:
if len(query_statements) == 1 and isinstance(
query_statements[0], sqlglot.exp.Column
):
return [sql.replace("`", "")]
creates, tables = set(), set()
for statement in query_statements:
if statement is None:
continue
creates |= {
_raw_table_name(expr.this)
for expr in statement.find_all(sqlglot.exp.Create)
}
tables |= (
{_raw_table_name(table) for table in statement.find_all(sqlglot.exp.Table)}
# ignore references created in this query
- creates
# ignore CTEs created in this statement
- {cte.alias_or_name for cte in statement.find_all(sqlglot.exp.CTE)}
)
return sorted(tables)
def get_metric_definitions() -> List[MetricHubDefinition]:
config_collection = ConfigCollection.from_github_repos(
[METRIC_HUB_REPO_URL, LOOKER_METRICS_URL]
)
metrics = []
for definition in config_collection.definitions:
for (
metric_name,
metric,
) in definition.spec.metrics.definitions.items():
# Some metrics don't have data sources
# (e.g. ad_click_rate, chained metric used in jetstream)
tables = None
datasource = None
if metric.data_source is not None:
datasource = config_collection.get_data_source_definition(
slug=metric.data_source.name, app_name=definition.platform
)
tables = [
table.format(dataset=datasource.default_dataset)
for table in _extract_table_references(datasource.from_expression)
]
statistics = []
if metric.statistics is not None:
for statistic_name, _ in metric.statistics.items():
statistics.append(MetricStatistic(name=statistic_name))
metrics.append(
MetricHubDefinition(
name=metric.name,
description=metric.description or "",
owners=(
[metric.owner]
if isinstance(metric.owner, str)
else metric.owner
),
level=metric.level if metric.level else None,
friendly_name=(
metric.friendly_name if metric.friendly_name else None
),
deprecated=metric.deprecated or False,
sql_definition=metric.select_expression,
product=definition.platform,
bigquery_tables=tables,
statistics=statistics,
data_source=datasource.name if datasource else None,
)
)
return metrics