bigquery_etl/backfill/validate.py (95 lines of code) (raw):
"""Validate backfill entries."""
import datetime
from pathlib import Path
from typing import List
from ..backfill.parse import (
BACKFILL_FILE,
DEFAULT_REASON,
DEFAULT_WATCHER,
Backfill,
BackfillStatus,
)
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..metadata.validate_metadata import SHREDDER_MITIGATION_LABEL
from .utils import MAX_BACKFILL_ENTRY_AGE_DAYS
def validate_duplicate_entry_dates(
backfill_entry: Backfill, backfills: list[Backfill]
) -> None:
"""Check if backfill entries have the same entry dates."""
for b in backfills:
if backfill_entry.entry_date == b.entry_date:
raise ValueError(
f"Duplicate backfill with entry date: {backfill_entry.entry_date}."
)
def validate_excluded_dates(entry: Backfill) -> None:
"""Check if backfill excluded dates are sorted and have no duplicates."""
if not entry.excluded_dates == sorted(entry.excluded_dates):
raise ValueError(
f"Existing backfill entry with excluded dates not sorted: {entry.entry_date}."
)
if not len(entry.excluded_dates) == len(set(entry.excluded_dates)):
raise ValueError(
f"Existing backfill entry with duplicate excluded dates: {entry.entry_date}."
)
def validate_default_reason(entry: Backfill) -> None:
"""Check if backfill reason is the same as default."""
if entry.reason == DEFAULT_REASON:
raise ValueError(f"Default reason found: {entry.reason}.")
def validate_default_watchers(entry: Backfill) -> None:
"""Check if backfill watcher is the same as default."""
if DEFAULT_WATCHER in entry.watchers:
raise ValueError(f"Default watcher found: ({entry.watchers}).")
def validate_entries_are_sorted(backfills: List[Backfill]) -> None:
"""Check if list of backfill entries are sorted by entry dates."""
entry_dates = [b.entry_date for b in backfills]
if not entry_dates == sorted(entry_dates, reverse=True):
raise ValueError("Backfill entries are not sorted by entry dates")
def validate_shredder_mitigation(entry: Backfill, backfill_file: Path) -> None:
"""Check if shredder mitigation in backfill entry and metadata label matches."""
if entry.status == BackfillStatus.INITIATE:
metadata_file = Path(str(backfill_file).replace(BACKFILL_FILE, METADATA_FILE))
metadata = Metadata.from_file(metadata_file)
has_shredder_mitigation_label = SHREDDER_MITIGATION_LABEL in metadata.labels
if has_shredder_mitigation_label != entry.shredder_mitigation:
raise ValueError(
f"{SHREDDER_MITIGATION_LABEL} label in {METADATA_FILE} and {BACKFILL_FILE} entry {entry.entry_date} should match."
)
def validate_depends_on_past_end_date(backfill_entry: Backfill, backfill_file: Path):
"""Check if the table depends on past and has an end_date before the entry date.
An end date in the past may result in data inconsistencies with depends_on_past tables.
"""
if backfill_entry.override_depends_on_past_end_date:
return
table_metadata = Metadata.from_file(backfill_file.parent / METADATA_FILE)
if not table_metadata.scheduling.get("depends_on_past", False):
return
if backfill_entry.end_date < backfill_entry.entry_date:
raise ValueError(
"End date must be on or after the backfill entry date for a depends_on_past table. "
"Use --override-depends-on-past-end-date flag with `bqetl backfill create` to override this check."
)
def validate_duplicate_entry_with_initiate_status(
backfill_entry: Backfill, backfills: list
) -> None:
"""Check if list of backfill entries have more than one entry with Initiate Status."""
if backfill_entry.status == BackfillStatus.INITIATE:
for b in backfills:
if b.status == BackfillStatus.INITIATE:
raise ValueError(
"Backfill entries cannot contain more than one entry with Initiate status"
)
def validate_old_entry_date(backfill_entry: Backfill) -> None:
"""Check if entry is in Initiate but is too old to run."""
if (
backfill_entry.status == BackfillStatus.INITIATE
and backfill_entry.entry_date
< datetime.date.today() - datetime.timedelta(days=MAX_BACKFILL_ENTRY_AGE_DAYS)
):
raise ValueError(
"Backfill entries will not run if they are older than "
f"{MAX_BACKFILL_ENTRY_AGE_DAYS} days old"
)
def validate_file(file: Path) -> None:
"""Validate all entries from a given backfill.yaml file."""
backfills = Backfill.entries_from_file(file)
validate_entries(backfills, file)
def validate_entries(backfills: List[Backfill], backfill_file: Path) -> None:
"""Validate a list of backfill entries."""
for i, backfill_entry in enumerate(backfills):
validate_default_watchers(backfill_entry)
validate_default_reason(backfill_entry)
validate_duplicate_entry_dates(backfill_entry, backfills[i + 1 :])
validate_excluded_dates(backfill_entry)
validate_shredder_mitigation(backfill_entry, backfill_file)
validate_duplicate_entry_with_initiate_status(
backfill_entry, backfills[i + 1 :]
)
validate_depends_on_past_end_date(backfill_entry, backfill_file)
validate_old_entry_date(backfill_entry)
validate_entries_are_sorted(backfills)