in generator/views/metric_definitions_view.py [0:0]
def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
"""Get this view as LookML."""
namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions(
self.namespace
)
if namespace_definitions is None:
return {}
# get all metric definitions that depend on the data source represented by this view
data_source_name = re.sub("^metric_definitions_", "", self.name)
data_source_definition = MetricsConfigLoader.configs.get_data_source_definition(
data_source_name, self.namespace
)
if data_source_definition is None:
return {}
# todo: hide deprecated metrics?
metric_definitions = [
f"""{
MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render()
} AS {metric_slug},\n"""
for metric_slug, metric in namespace_definitions.metrics.definitions.items()
if metric.select_expression
and metric.data_source.name == data_source_name
and metric.type != "histogram"
]
if metric_definitions == []:
return {}
# Metric definitions are intended to aggregated by client per date.
# A derived table is needed to do these aggregations, instead of defining them as measures
# we want to have them available as dimensions (which don't allow aggregations in their definitions)
# to allow for custom measures to be later defined in Looker that aggregate these per client metrics.
view_defn: Dict[str, Any] = {"name": self.name}
ignore_base_fields = [
"client_id",
"submission_date",
"submission",
"first_run",
] + [
metric_slug
for metric_slug, metric in namespace_definitions.metrics.definitions.items()
if metric.select_expression
and metric.data_source.name == data_source_name
and metric.type != "histogram"
]
base_view_dimensions = {}
joined_data_sources = []
# check if the metric data source has joins
# joined data sources are generally used for creating the "Base Fields"
if data_source_definition.joins:
# determine the dimensions selected by the joined data sources
for joined_data_source_slug, join in data_source_definition.joins.items():
joined_data_source = (
MetricsConfigLoader.configs.get_data_source_definition(
joined_data_source_slug, self.namespace
)
)
if joined_data_source.columns_as_dimensions:
joined_data_sources.append(joined_data_source)
date_filter = None
if joined_data_source.submission_date_column != "NULL":
date_filter = (
None
if joined_data_source.submission_date_column is None
or joined_data_source.submission_date_column == "NULL"
else f"{joined_data_source.submission_date_column} = '2023-01-01'"
)
# create Looker dimensions by doing a dryrun
query = MetricsConfigLoader.configs.get_data_source_sql(
joined_data_source_slug,
self.namespace,
where=date_filter,
).format(dataset=self.namespace)
base_view_dimensions[joined_data_source_slug] = (
lookml_utils._generate_dimensions_from_query(
query, dryrun=dryrun
)
)
if (
data_source_definition.client_id_column == "NULL"
and not base_view_dimensions
) or data_source_definition.columns_as_dimensions:
# if the metrics data source doesn't have any joins then use the dimensions
# of the data source itself as base fields
date_filter = None
if data_source_definition.submission_date_column != "NULL":
date_filter = (
"submission_date = '2023-01-01'"
if data_source_definition.submission_date_column is None
else f"{data_source_definition.submission_date_column} = '2023-01-01'"
)
query = MetricsConfigLoader.configs.get_data_source_sql(
data_source_definition.name,
self.namespace,
where=date_filter,
ignore_joins=True,
).format(dataset=self.namespace)
base_view_dimensions[data_source_definition.name] = (
lookml_utils._generate_dimensions_from_query(query, dryrun)
)
# to prevent duplicate dimensions, especially when working with time dimensions
# where names are modified potentially causing naming collisions
seen_dimensions = set()
# prepare base field data for query
base_view_fields = []
for data_source, dimensions in base_view_dimensions.items():
for dimension in dimensions:
if (
dimension["name"] not in ignore_base_fields
and dimension["name"] not in seen_dimensions
and "hidden" not in dimension
):
sql = (
f"{data_source}.{dimension['name'].replace('__', '.')} AS"
+ f" {data_source}_{dimension['name']},\n"
)
# date/time/timestamp suffixes are removed when generating lookml dimensions, however we
# need the original field name for the derived view SQL
if dimension["type"] == "time" and not dimension["sql"].endswith(
dimension["name"]
):
suffix = dimension["sql"].split(
dimension["name"].replace("__", ".")
)[-1]
sql = (
f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS"
+ f" {data_source}_{dimension['name']},\n"
)
base_view_fields.append(
{
"name": f"{data_source}_{dimension['name']}",
"select_sql": f"{data_source}_{dimension['name']},\n",
"sql": sql,
}
)
seen_dimensions.add(dimension["name"])
client_id_field = (
"NULL"
if data_source_definition.client_id_column == "NULL"
else f'{data_source_definition.client_id_column or "client_id"}'
)
# filters for date ranges
where_sql = " AND ".join(
[
f"""
{data_source.name}.{data_source.submission_date_column or "submission_date"}
BETWEEN
COALESCE(
SAFE_CAST(
{{% date_start submission_date %}} AS DATE
), CURRENT_DATE()) AND
COALESCE(
SAFE_CAST(
{{% date_end submission_date %}} AS DATE
), CURRENT_DATE())
"""
for data_source in [data_source_definition] + joined_data_sources
if data_source.submission_date_column != "NULL"
]
)
# filte on sample_id if such a field exists
for field in base_view_fields:
if field["name"].endswith("_sample_id"):
where_sql += f"""
AND
{field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}}
"""
break
view_defn["derived_table"] = {
"sql": f"""
SELECT
{"".join(metric_definitions)}
{"".join([field['select_sql'] for field in base_view_fields])}
{client_id_field} AS client_id,
{{% if aggregate_metrics_by._parameter_value == 'day' %}}
{data_source_definition.submission_date_column or "submission_date"} AS analysis_basis
{{% elsif aggregate_metrics_by._parameter_value == 'week' %}}
(FORMAT_DATE(
'%F',
DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"},
WEEK(MONDAY)))
) AS analysis_basis
{{% elsif aggregate_metrics_by._parameter_value == 'month' %}}
(FORMAT_DATE(
'%Y-%m',
{data_source_definition.submission_date_column or "submission_date"})
) AS analysis_basis
{{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}}
(FORMAT_DATE(
'%Y-%m',
DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"},
QUARTER))
) AS analysis_basis
{{% elsif aggregate_metrics_by._parameter_value == 'year' %}}
(EXTRACT(
YEAR FROM {data_source_definition.submission_date_column or "submission_date"})
) AS analysis_basis
{{% else %}}
NULL as analysis_basis
{{% endif %}}
FROM
(
SELECT
{data_source_name}.*,
{"".join([field['sql'] for field in base_view_fields])}
FROM
{
MetricsConfigLoader.configs.get_data_source_sql(
data_source_name,
self.namespace,
select_fields=False
).format(dataset=self.namespace)
}
WHERE {where_sql}
)
GROUP BY
{"".join([field['select_sql'] for field in base_view_fields])}
client_id,
analysis_basis
"""
}
view_defn["dimensions"] = self.get_dimensions()
view_defn["dimension_groups"] = self.get_dimension_groups()
# add the Looker dimensions
for data_source, dimensions in base_view_dimensions.items():
for dimension in dimensions:
if dimension["name"] not in ignore_base_fields:
dimension["sql"] = (
"${TABLE}." + f"{data_source}_{dimension['name']}"
)
dimension["group_label"] = "Base Fields"
if not lookml_utils._is_dimension_group(dimension):
view_defn["dimensions"].append(dimension)
else:
view_defn["dimension_groups"].append(dimension)
# avoid duplicate dimensions
ignore_base_fields.append(dimension["name"])
view_defn["measures"] = self.get_measures(
view_defn["dimensions"],
)
view_defn["sets"] = self._get_sets()
view_defn["parameters"] = self._get_parameters(view_defn["dimensions"])
return {"views": [view_defn]}