def run()

in bigquery_etl/cli/monitoring.py [0:0]


def run(name, project_id, sql_dir, workspace, base_url, marker):
    """Run Bigeye checks."""
    api_key = os.environ.get("BIGEYE_API_KEY")
    if api_key is None:
        click.echo(
            "Bigeye API token needs to be set via `BIGEYE_API_KEY` env variable."
        )
        sys.exit(1)

    api_auth = APIKeyAuth(base_url=base_url, api_key=api_key)
    client = datawatch_client_factory(api_auth, workspace_id=workspace)
    warehouse_id = ConfigLoader.get("monitoring", "bigeye_warehouse_id")
    existing_rules = {
        rule.custom_rule.sql: {"id": rule.id, "name": rule.custom_rule.name}
        for rule in client.get_rules_for_source(warehouse_id=warehouse_id).custom_rules
        if rule.custom_rule.name.endswith(marker or "")
    }

    metadata_files = paths_matching_name_pattern(
        name, sql_dir, project_id=project_id, files=["metadata.yaml"]
    )

    failed = False

    for metadata_file in list(set(metadata_files)):
        project, dataset, table = extract_from_query_path(metadata_file)
        try:
            metadata = Metadata.from_file(metadata_file)
            if metadata.monitoring and metadata.monitoring.enabled:
                metrics = client.get_metric_info_batch_post(
                    table_name=table,
                    schema_name=f"{project}.{dataset}",
                    warehouse_ids=[warehouse_id],
                ).metrics

                if marker:
                    metrics = [
                        metric for metric in metrics if metric.name.endswith(marker)
                    ]

                metric_ids = [metric.metric_configuration.id for metric in metrics]

                click.echo(
                    f"Trigger metric runs for {project}.{dataset}.{table}: {metric_ids}"
                )
                response = client.run_metric_batch_async(metric_ids=metric_ids)
                for metric_info in response:
                    latest_metric_run = metric_info.latest_metric_runs[-1]
                    if (
                        latest_metric_run
                        and latest_metric_run.status in METRIC_STATUS_FAILURES
                    ):
                        if metric_info.metric_configuration.name.lower().endswith(
                            "[fail]"
                        ):
                            failed = True
                        click.echo(
                            f"Error running check {metric_info.metric_configuration.id}: {metric_info.active_issue.display_name}"
                        )
                        click.echo(
                            f"Check {base_url}/w/{workspace}/catalog/data-sources/metric/{metric_info.metric_configuration.id}/chart for more information."
                        )

                if (metadata_file.parent / CUSTOM_RULES_FILE).exists():
                    for select_statement in _sql_rules_from_file(
                        metadata_file.parent / CUSTOM_RULES_FILE,
                        project,
                        dataset,
                        table,
                    ):
                        sql = select_statement.sql(dialect="bigquery")
                        if sql in existing_rules:
                            response = client._call_datawatch(
                                Method.GET,
                                url=f"/api/v1/custom-rules/run/{existing_rules[sql]['id']}",
                            )
                            click.echo(
                                f"Triggered custom rule {existing_rules[sql]['id']} for {project}.{dataset}.{table}"
                            )

                            latest_rule_run = response.get("latestRuns", [])
                            if latest_rule_run and latest_rule_run[-1].get(
                                "status"
                            ) in {status.name for status in METRIC_STATUS_FAILURES}:
                                if (
                                    not existing_rules[sql]["name"]
                                    .lower()
                                    .endswith("[warn]")
                                ):
                                    failed = True

                                click.echo(
                                    f"Error running custom rule {existing_rules[sql]} for {project}.{dataset}.{table}. "
                                    + f"Check {base_url}/w/{workspace}/catalog/data-sources/{warehouse_id}/rules/{existing_rules[sql]['id']}/runs "
                                    + "for more information."
                                )

        except FileNotFoundError:
            print("No metadata file for: {}.{}.{}".format(project, dataset, table))

    if failed:
        sys.exit(1)