in opmon/monitoring.py [0:0]
def validate(self) -> None:
"""Validate ETL and configs of opmon project."""
self._check_runnable()
if self.config.project and self.config.project.skip:
return
metrics_sql = self._get_metrics_sql(
submission_date=self.config.project.start_date, # type: ignore
first_run=True,
)
print(f"Dry run metrics SQL for {self.normalized_slug}")
if callable(self.before_execute_callback):
# Before and after are the same query for metrics: there's no
# modification preparing for dry run.
if isinstance(metrics_sql, list):
for idx, query in enumerate(metrics_sql):
self.before_execute_callback(
query,
None,
annotations={
"slug": self.slug,
"type": "validate_metrics_query_before",
"submission_date": self.config.project.start_date,
"part": f"part-{idx}",
},
)
self.before_execute_callback(
query,
None,
annotations={
"slug": self.slug,
"type": "validate_metrics_query_after",
"submission_date": self.config.project.start_date,
"part": f"part-{idx}",
},
)
else:
self.before_execute_callback(
metrics_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_metrics_query_before",
"submission_date": self.config.project.start_date,
},
)
self.before_execute_callback(
metrics_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_metrics_query_after",
"submission_date": self.config.project.start_date,
},
)
dry_run_query(metrics_sql)
dummy_metrics = {}
for summary in self.config.metrics:
if summary.metric.name not in dummy_metrics:
dummy_metrics[summary.metric.name] = "1"
if summary.metric.type == "histogram":
dummy_metrics[
summary.metric.name
] = """
[STRUCT(
3 AS bucket_count,
4 AS histogram_type,
12 AS `sum`,
[1, 12] AS `range`,
[STRUCT(0 AS key, 12 AS value)] AS `values`
)]
"""
metrics_table_dummy = f"""
(
SELECT
CURRENT_DATE() AS submission_date,
1 AS client_id,
NULL AS build_id,
{",".join([f"1 AS {d.name}" for d in self.config.dimensions])}
{"," if len(self.config.dimensions) > 0 else ""}
"foo" AS branch,
{",".join([f"{d} AS {metric}" for metric, d in dummy_metrics.items()])}
)
"""
statistics_sql = self._get_statistics_sql(
submission_date=self.config.project.start_date, # type: ignore
)
# The original query is more useful for inspection.
if callable(self.before_execute_callback):
self.before_execute_callback(
statistics_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_statistics_query_before",
"submission_date": self.config.project.start_date,
},
)
statistics_sql = statistics_sql.replace(
f"`{self.project}.{self.dataset}.{self.normalized_slug}`",
metrics_table_dummy,
)
statistics_sql = statistics_sql.replace(
f"`{self.project}.{self.derived_dataset}.{self.normalized_slug}"
+ f"_v{SCHEMA_VERSIONS['metric']}`",
metrics_table_dummy,
)
print(f"Dry run statistics SQL for {self.normalized_slug}")
# But the modified query is what is actually submitted.
if callable(self.before_execute_callback):
self.before_execute_callback(
statistics_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_statistics_query_after",
"submission_date": self.config.project.start_date,
},
)
dry_run_query(statistics_sql)
total_alerts = 0
for _ in self.config.alerts:
total_alerts += 1
if total_alerts > 0:
statistics_table_dummy = f"""
(
SELECT
CURRENT_DATE() AS submission_date,
NULL AS build_id,
"test" AS metric,
"test" AS statistic,
"disabled" AS branch,
{",".join([f"1 AS {d.name}" for d in self.config.dimensions])}
{"," if len(self.config.dimensions) > 0 else ""}
1.2 AS point,
NULL AS lower,
NULL AS upper,
NULL AS parameter
)
"""
alerts_sql = self._get_sql_for_alerts(
submission_date=self.config.project.start_date, # type: ignore
)
# Again, the original is more useful for inspection.
if callable(self.before_execute_callback):
self.before_execute_callback(
alerts_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_alerts_query_before",
"submission_date": self.config.project.start_date,
},
)
alerts_sql = alerts_sql.replace(
f"`{self.project}.{self.dataset}.{self.normalized_slug}_statistics`",
statistics_table_dummy,
)
print(f"Dry run alerts SQL for {self.normalized_slug}")
if callable(self.before_execute_callback):
self.before_execute_callback(
alerts_sql,
None,
annotations={
"slug": self.slug,
"type": "validate_alerts_query_after",
"submission_date": self.config.project.start_date,
},
)
dry_run_query(alerts_sql)