def migrate()

in bigquery_etl/cli/monitoring.py [0:0]


def migrate(ctx, name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None:
    """Migrate checks.sql files to BigConfig."""
    check_files = paths_matching_name_pattern(
        name, sql_dir, project_id=project_id, files=["checks.sql"]
    )

    for check_file in list(set(check_files)):
        project, dataset, table = extract_from_query_path(check_file)
        checks_migrated = 0
        try:
            metadata_file = check_file.parent / METADATA_FILE
            metadata = Metadata.from_file(check_file.parent / METADATA_FILE)
            metadata.monitoring = {"enabled": True}
            metadata.write(metadata_file)

            ctx.invoke(
                update,
                name=metadata_file.parent,
                sql_dir=sql_dir,
                project_id=project_id,
            )

            # clearing Bigeye file index after running the `update` command
            # this is necessary as Bigeye throws an exception when it sees the same config file twice,
            # which will be the case after doing `update` previously
            _BIGEYE_YAML_FILE_IX.clear()
            bigconfig_file = check_file.parent / BIGCONFIG_FILE
            if bigconfig_file.exists():
                bigconfig = BigConfig.load(bigconfig_file)
            else:
                bigconfig = BigConfig(type="BIGCONFIG_FILE")

            checks = check_file.read_text()
            metrics_by_column = defaultdict(list)

            # map pre-defined ETL checks to Bigeye checks
            if matches := re.findall(
                r"not_null\(\[([^\]\)]*)\](?:, \"(.*)\")?\)", checks
            ):
                for match in matches:
                    checks_migrated += 1
                    columns = [
                        col.replace('"', "")
                        for col in match[0]
                        .replace("[", "")
                        .replace("]", "")
                        .split(", ")
                    ]
                    metric_definition = SimpleMetricDefinition(
                        metric_type=SimplePredefinedMetric(
                            type="PREDEFINED",
                            predefined_metric=SimplePredefinedMetricName.PERCENT_NOT_NULL,
                        ),
                        threshold=SimpleConstantThreshold(
                            type="CONSTANT", lower_bound=1.0
                        ),
                    )

                    for column in columns:
                        metrics_by_column[
                            f"{project}.{project}.{dataset}.{table}.{column}"
                        ].append(metric_definition)

            if matches := re.findall(
                r"min_row_count\(([^,\)]*)(?:, \"(.*)\")?\)", checks
            ):
                for match in matches:
                    checks_migrated += 1
                    metrics_by_column[
                        f"{project}.{project}.{dataset}.{table}.*"
                    ].append(
                        SimpleMetricDefinition(
                            metric_type=SimplePredefinedMetric(
                                type="PREDEFINED",
                                predefined_metric=SimplePredefinedMetricName.COUNT_ROWS,
                            ),
                            threshold=SimpleConstantThreshold(
                                type="CONSTANT", lower_bound=int(match[0])
                            ),
                        )
                    )

            if matches := re.findall(r"is_unique\(([^\]\)]*)(?:, \"(.*)\")?\)", checks):
                for match in matches:
                    checks_migrated += 1
                    columns = [
                        col.replace('"', "")
                        for col in match[0]
                        .replace("[", "")
                        .replace("]", "")
                        .split(", ")
                    ]
                    metric_definition = SimpleMetricDefinition(
                        metric_type=SimplePredefinedMetric(
                            type="PREDEFINED",
                            predefined_metric=SimplePredefinedMetricName.COUNT_DUPLICATES,
                        ),
                        threshold=SimpleConstantThreshold(
                            type="CONSTANT", lower_bound=0.0
                        ),
                    )

                    for column in columns:
                        metrics_by_column[
                            f"{project}.{project}.{dataset}.{table}.{column}"
                        ].append(metric_definition)

            if matches := re.findall(r"in_range\((.*), (.*), (.*), \"(.*)\"\)", checks):
                for match in matches:
                    checks_migrated += 1
                    columns = [
                        col.replace('"', "")
                        for col in match[0]
                        .replace("[", "")
                        .replace("]", "")
                        .split(", ")
                    ]
                    metric_definition = SimpleMetricDefinition(
                        metric_name="Range",
                        metric_type=SimplePredefinedMetric(
                            type="PREDEFINED",
                            predefined_metric=SimplePredefinedMetricName.MIN,
                        ),
                        threshold=SimpleConstantThreshold(
                            type="CONSTANT",
                            lower_bound=int(match[1]) if match[1] != "none" else None,
                            upper_bound=int(match[2]) if match[2] != "none" else None,
                        ),
                    )

                    for column in columns:
                        metrics_by_column[
                            f"{project}.{project}.{dataset}.{table}.{column}"
                        ].append(metric_definition)

            if matches := re.findall(
                r"value_length\(column=\"(.*)\", expected_length=(.*), where=\"(.*)\"\)",
                checks,
            ):
                for match in matches:
                    checks_migrated += 1
                    metric_definition = SimpleMetricDefinition(
                        metric_name="Value Length",
                        metric_type=SimplePredefinedMetric(
                            type="PREDEFINED",
                            predefined_metric=SimplePredefinedMetricName.STRING_LENGTH_MIN,
                        ),
                        threshold=SimpleConstantThreshold(
                            type="CONSTANT",
                            lower_bound=int(match[1]),
                            upper_bound=int(match[1]),
                        ),
                    )

                    for column in columns:
                        metrics_by_column[
                            f"{project}.{project}.{dataset}.{table}.{column}"
                        ].append(metric_definition)

            deployments = []
            for column_selector, metrics in metrics_by_column.items():
                deployments.append(
                    TagDeployment(
                        column_selectors=[ColumnSelector(name=column_selector)],
                        metrics=metrics,
                    )
                )

            bigconfig.tag_deployments += [TagDeploymentSuite(deployments=deployments)]

            bigconfig.save(
                output_path=bigconfig_file.parent,
                default_file_name=bigconfig_file.stem,
            )

            total_checks = checks.count("#fail") + checks.count("#warn")
            click.echo(
                f"Migrated {checks_migrated} of {total_checks} checks to {bigconfig_file}."
            )
            if checks_migrated < total_checks:
                click.echo(
                    f"There might be custom SQL checks that need to be migrated manually for {check_file.parent}"
                )

        except FileNotFoundError:
            print(f"No metadata file for: {check_file.parent}")

    click.echo(
        "Please manually check the migration logic as it might not be 100% correct"
    )