in bigquery_etl/cli/monitoring.py [0:0]
def run(name, project_id, sql_dir, workspace, base_url, marker):
"""Run Bigeye checks."""
api_key = os.environ.get("BIGEYE_API_KEY")
if api_key is None:
click.echo(
"Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
)
sys.exit(1)
api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
client = datawatch_client_factory(api_auth, workspace_id=workspace)
warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
existing_rules = {
rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name}
for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
if rule.custom_rule.name.endswith(marker or "")
}
metadata_files = paths_matching_name_pattern(
name, sql_dir, project_id=project_id, files=["metadata.yaml"]
)
failed = False
for metadata_file in list(set(metadata_files)):
project, dataset, table = extract_from_query_path(metadata_file)
try:
metadata = Metadata.from_file(metadata_file)
if metadata.monitoring and metadata.monitoring.enabled:
metrics = client.get_metric_info_batch_post(
table_name=table,
schema_name=f"{project}.{dataset}",
warehouse_ids=[warehouse_id],
).metrics
if marker:
metrics = [
metric for metric in metrics if metric.name.endswith(marker)
]
metric_ids = [metric.metric_configuration.id for metric in metrics]
click.echo(
f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}"
)
response = client.run_metric_batch_async(metric_ids=metric_ids)
for metric_info in response:
latest_metric_run = metric_info.latest_metric_runs[-1]
if (
latest_metric_run
and latest_metric_run.status in METRIC_STATUS_FAILURES
):
if metric_info.metric_configuration.name.lower().endswith(
"[fail]"
):
failed = True
click.echo(
f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}"
)
click.echo(
f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information."
)
if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
for select_statement in _sql_rules_from_file(
metadata_file.parent / CUSTOM_RULES_FILE,
project,
dataset,
table,
):
sql = select_statement.sql(dialect="bigquery")
if sql in existing_rules:
response = client._call_datawatch(
Method.GET,
url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}",
)
click.echo(
f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}"
)
latest_rule_run = response.get("latestRuns", [])
if latest_rule_run and latest_rule_run[-1].get(
"status"
) in {status.name for status in METRIC_STATUS_FAILURES}:
if (
not existing_rules[sql]["name"]
.lower()
.endswith("[warn]")
):
failed = True
click.echo(
f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. "
+ f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs "
+ "for more information."
)
except FileNotFoundError:
print("No metadata file for: {}.{}.{}".format(project, dataset, table))
if failed:
sys.exit(1)