in generator/views/table_view.py [0:0]
def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
"""Generate LookML for this view."""
view_defn: Dict[str, Any] = {"name": self.name}
# use schema for the table where channel=="release" or the first one
table = next(
(table for table in self.tables if table.get("channel") == "release"),
self.tables[0],
)["table"]
# add dimensions and dimension groups
dimensions = lookml_utils._generate_dimensions(table, dryrun=dryrun)
view_defn["dimensions"] = list(
filterfalse(lookml_utils._is_dimension_group, dimensions)
)
view_defn["dimension_groups"] = list(
filter(lookml_utils._is_dimension_group, dimensions)
)
# add tag "time_partitioning_field"
time_partitioning_fields: Set[str] = set(
# filter out falsy values
filter(
None, (table.get("time_partitioning_field") for table in self.tables)
)
)
if len(time_partitioning_fields) > 1:
raise ClickException(f"Multiple time_partitioning_fields for {self.name!r}")
elif len(time_partitioning_fields) == 1:
field_name = time_partitioning_fields.pop()
sql = f"${{TABLE}}.{field_name}"
for group_defn in view_defn["dimension_groups"]:
if group_defn["sql"] == sql:
if "tags" not in group_defn:
group_defn["tags"] = []
group_defn["tags"].append("time_partitioning_field")
break
else:
raise ClickException(
f"time_partitioning_field {field_name!r} not found in {self.name!r}"
)
[project, dataset, table_id] = table.split(".")
table_schema = dryrun.create(
project=project,
dataset=dataset,
table=table_id,
).get_table_schema()
nested_views = lookml_utils._generate_nested_dimension_views(
table_schema, self.name
)
if self.measures:
view_defn["measures"] = [
{"name": measure_name, **measure_parameters}
for measure_name, measure_parameters in self.measures.items()
]
# parameterize table name
if len(self.tables) > 1:
view_defn["parameters"] = [
{
"name": "channel",
"type": "unquoted",
"default_value": table,
"allowed_values": [
{
"label": _table["channel"].title(),
"value": _table["table"],
}
for _table in self.tables
],
}
]
view_defn["sql_table_name"] = "`{% parameter channel %}`"
else:
view_defn["sql_table_name"] = f"`{table}`"
return {"views": [view_defn] + nested_views}