in bigquery_etl/cli/monitoring.py [0:0]
def migrate(ctx, name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None:
"""Migrate checks.sql files to BigConfig."""
check_files = paths_matching_name_pattern(
name, sql_dir, project_id=project_id, files=["checks.sql"]
)
for check_file in list(set(check_files)):
project, dataset, table = extract_from_query_path(check_file)
checks_migrated = 0
try:
metadata_file = check_file.parent / METADATA_FILE
metadata = Metadata.from_file(check_file.parent / METADATA_FILE)
metadata.monitoring = {"enabled": True}
metadata.write(metadata_file)
ctx.invoke(
update,
name=metadata_file.parent,
sql_dir=sql_dir,
project_id=project_id,
)
# clearing Bigeye file index after running the `update` command
# this is necessary as Bigeye throws an exception when it sees the same config file twice,
# which will be the case after doing `update` previously
_BIGEYE_YAML_FILE_IX.clear()
bigconfig_file = check_file.parent / BIGCONFIG_FILE
if bigconfig_file.exists():
bigconfig = BigConfig.load(bigconfig_file)
else:
bigconfig = BigConfig(type="BIGCONFIG_FILE")
checks = check_file.read_text()
metrics_by_column = defaultdict(list)
# map pre-defined ETL checks to Bigeye checks
if matches := re.findall(
r"not_null\(\[([^\]\)]*)\](?:, \"(.*)\")?\)", checks
):
for match in matches:
checks_migrated += 1
columns = [
col.replace('"', "")
for col in match[0]
.replace("[", "")
.replace("]", "")
.split(", ")
]
metric_definition = SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED",
predefined_metric=SimplePredefinedMetricName.PERCENT_NOT_NULL,
),
threshold=SimpleConstantThreshold(
type="CONSTANT", lower_bound=1.0
),
)
for column in columns:
metrics_by_column[
f"{project}.{project}.{dataset}.{table}.{column}"
].append(metric_definition)
if matches := re.findall(
r"min_row_count\(([^,\)]*)(?:, \"(.*)\")?\)", checks
):
for match in matches:
checks_migrated += 1
metrics_by_column[
f"{project}.{project}.{dataset}.{table}.*"
].append(
SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED",
predefined_metric=SimplePredefinedMetricName.COUNT_ROWS,
),
threshold=SimpleConstantThreshold(
type="CONSTANT", lower_bound=int(match[0])
),
)
)
if matches := re.findall(r"is_unique\(([^\]\)]*)(?:, \"(.*)\")?\)", checks):
for match in matches:
checks_migrated += 1
columns = [
col.replace('"', "")
for col in match[0]
.replace("[", "")
.replace("]", "")
.split(", ")
]
metric_definition = SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED",
predefined_metric=SimplePredefinedMetricName.COUNT_DUPLICATES,
),
threshold=SimpleConstantThreshold(
type="CONSTANT", lower_bound=0.0
),
)
for column in columns:
metrics_by_column[
f"{project}.{project}.{dataset}.{table}.{column}"
].append(metric_definition)
if matches := re.findall(r"in_range\((.*), (.*), (.*), \"(.*)\"\)", checks):
for match in matches:
checks_migrated += 1
columns = [
col.replace('"', "")
for col in match[0]
.replace("[", "")
.replace("]", "")
.split(", ")
]
metric_definition = SimpleMetricDefinition(
metric_name="Range",
metric_type=SimplePredefinedMetric(
type="PREDEFINED",
predefined_metric=SimplePredefinedMetricName.MIN,
),
threshold=SimpleConstantThreshold(
type="CONSTANT",
lower_bound=int(match[1]) if match[1] != "none" else None,
upper_bound=int(match[2]) if match[2] != "none" else None,
),
)
for column in columns:
metrics_by_column[
f"{project}.{project}.{dataset}.{table}.{column}"
].append(metric_definition)
if matches := re.findall(
r"value_length\(column=\"(.*)\", expected_length=(.*), where=\"(.*)\"\)",
checks,
):
for match in matches:
checks_migrated += 1
metric_definition = SimpleMetricDefinition(
metric_name="Value Length",
metric_type=SimplePredefinedMetric(
type="PREDEFINED",
predefined_metric=SimplePredefinedMetricName.STRING_LENGTH_MIN,
),
threshold=SimpleConstantThreshold(
type="CONSTANT",
lower_bound=int(match[1]),
upper_bound=int(match[1]),
),
)
for column in columns:
metrics_by_column[
f"{project}.{project}.{dataset}.{table}.{column}"
].append(metric_definition)
deployments = []
for column_selector, metrics in metrics_by_column.items():
deployments.append(
TagDeployment(
column_selectors=[ColumnSelector(name=column_selector)],
metrics=metrics,
)
)
bigconfig.tag_deployments += [TagDeploymentSuite(deployments=deployments)]
bigconfig.save(
output_path=bigconfig_file.parent,
default_file_name=bigconfig_file.stem,
)
total_checks = checks.count("#fail") + checks.count("#warn")
click.echo(
f"Migrated {checks_migrated} of {total_checks} checks to {bigconfig_file}."
)
if checks_migrated < total_checks:
click.echo(
f"There might be custom SQL checks that need to be migrated manually for {check_file.parent}"
)
except FileNotFoundError:
print(f"No metadata file for: {check_file.parent}")
click.echo(
"Please manually check the migration logic as it might not be 100% correct"
)