bigquery_etl/backfill/parse.py (155 lines of code) (raw):
"""Parse backfill entries."""
import enum
import os
from datetime import date
from pathlib import Path
from typing import List, Optional
import attr
import yaml
from bigquery_etl.query_scheduling.utils import is_email_or_github_identity
BACKFILL_FILE = "backfill.yaml"
DEFAULT_WATCHER = "nobody@mozilla.com"
DEFAULT_REASON = "Please provide a reason for the backfill and links to any related bugzilla or jira tickets"
DEFAULT_BILLING_PROJECT = "moz-fx-data-backfill-slots"
class UniqueKeyLoader(yaml.SafeLoader):
"""YAML loader to check duplicate keys."""
def construct_mapping(self, node, deep=False):
"""Create mapping while checking for duplicate keys."""
mapping = set()
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if key in mapping:
raise ValueError(
f"Backfill entry already exists with entry date: {key}."
)
mapping.add(key)
return super().construct_mapping(node, deep)
class Literal(str):
"""Represents a YAML literal."""
pass
def literal_presenter(dumper, data):
"""Literal representer for YAML output."""
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml.add_representer(Literal, literal_presenter)
class BackfillStatus(enum.Enum):
"""Represents backfill status types."""
INITIATE = "Initiate"
COMPLETE = "Complete"
@attr.s(auto_attribs=True)
class Backfill:
"""
Representation of a backfill entry configuration.
Uses attrs to simplify the class definition and provide validation.
Docs: https://www.attrs.org
"""
entry_date: date = attr.ib()
start_date: date = attr.ib()
end_date: date = attr.ib()
excluded_dates: List[date] = attr.ib()
reason: str = attr.ib()
watchers: List[str] = attr.ib()
status: BackfillStatus = attr.ib()
custom_query_path: Optional[str] = attr.ib(None)
shredder_mitigation: Optional[bool] = attr.ib(False)
override_retention_limit: Optional[bool] = attr.ib(False)
override_depends_on_past_end_date: Optional[bool] = attr.ib(False)
billing_project: Optional[str] = attr.ib(None)
def __str__(self):
"""Return print friendly string of backfill object."""
backfill_str = f"""
entry_date = {self.entry_date}
start_date = {self.start_date}
end_date = {self.end_date}
""".rstrip()
if self.excluded_dates:
backfill_str += f"""
excluded_dates = {[str(e) for e in self.excluded_dates]}
""".rstrip()
backfill_str += f"""
reason = {self.reason}
watcher(s) = {self.watchers}
status = {self.status.value}
custom_query_path = {self.custom_query_path}
shredder_mitigation = {self.shredder_mitigation}
override_retention_limit = {self.override_retention_limit}
override_depends_on_past_end_date = {self.override_depends_on_past_end_date}
billing_project = {self.billing_project}
"""
return backfill_str.replace("'", "")
@entry_date.validator
def validate_entry_date(self, attribute, value):
"""Check that provided entry date is not in the future."""
if date.today() < value:
raise ValueError(f"Backfill entry {value} can't be in the future.")
@start_date.validator
def validate_start_date(self, attribute, value):
"""Check that provided start date is before end date and entry date."""
if self.end_date < value or self.entry_date < value:
raise ValueError(f"Invalid start date: {value}.")
@end_date.validator
def validate_end_date(self, attribute, value):
"""Check that provided end date is after start date and before entry date."""
if value < self.start_date or value > self.entry_date:
raise ValueError(f"Invalid end date: {value}.")
@excluded_dates.validator
def validate_excluded_dates(self, attribute, value):
"""Check that provided excluded dates are between start and end dates, are sorted and contain no duplicates."""
if not all(map(lambda e: self.start_date < e < self.end_date, value)):
raise ValueError(f"Invalid excluded dates: {value}.")
if not value == sorted(value):
raise ValueError(
f"Existing backfill entry with excluded dates not sorted: {value}."
)
if not len(value) == len(set(value)):
raise ValueError(
f"Existing backfill entry with duplicate excluded dates: {value}."
)
@watchers.validator
def validate_watchers(self, attribute, value):
"""Check that provided watchers are valid emails or Github identity with no duplicates."""
if not value or not all(
map(lambda e: e and is_email_or_github_identity(e), value)
):
raise ValueError(f"Invalid email or Github identity for watchers: {value}.")
if len(value) != len(set(value)):
raise ValueError(f"Duplicate watcher in ({value}).")
@reason.validator
def validate_reason(self, attribute, value):
"""Check that provided status is not empty."""
if not value:
raise ValueError("Reason in backfill entry should not be empty.")
@status.validator
def validate_status(self, attribute, value):
"""Check that provided status is valid."""
if not hasattr(BackfillStatus, value.name):
raise ValueError(f"Invalid status: {value.name}.")
@billing_project.validator
def validate_billing_project(self, attribute, value):
"""Check that billing project is valid."""
if value and not value.startswith("moz-fx-data-backfill-"):
raise ValueError(
f"Invalid billing project: {value}. Please use one of the projects assigned to backfills."
)
@staticmethod
def is_backfill_file(file_path: Path) -> bool:
"""Check if the provided file is a backfill file."""
return os.path.basename(file_path) == BACKFILL_FILE
@classmethod
def entries_from_file(
cls, file: Path, status: Optional[str] = None
) -> List["Backfill"]:
"""
Parse all backfill entries from the provided yaml file.
Return a list with all backfill entries.
@param status: optional status param for filtering backfill entries with specific status.
If status is not provided, all backfill entries will be returned.
"""
if not cls.is_backfill_file(file):
raise ValueError(f"Invalid file: {file}.")
backfill_entries: List[Backfill] = []
with open(file, "r") as yaml_stream:
try:
backfills = yaml.load(yaml_stream, Loader=UniqueKeyLoader) or {}
for entry_date, entry in backfills.items():
if status is not None and entry["status"] != status:
continue
backfill = cls(
entry_date=entry_date,
start_date=entry["start_date"],
end_date=entry["end_date"],
excluded_dates=entry.get("excluded_dates", []),
reason=entry["reason"],
watchers=entry["watchers"],
status=BackfillStatus[entry["status"].upper()],
custom_query_path=entry.get("custom_query_path", None),
shredder_mitigation=entry.get("shredder_mitigation", False),
override_retention_limit=entry.get(
"override_retention_limit", False
),
override_depends_on_past_end_date=entry.get(
"override_depends_on_past_end_date", False
),
billing_project=entry.get("billing_project", None),
)
backfill_entries.append(backfill)
except yaml.YAMLError as e:
raise ValueError(f"Unable to parse Backfill file {file}") from e
except ValueError as e:
raise ValueError(f"Unable to parse Backfill file {file}: {e}") from e
return backfill_entries
def to_yaml(self) -> str:
"""Create dictionary version of yaml for writing to file."""
entry_dict = self.__dict__.copy()
del entry_dict["entry_date"]
entry_dict["status"] = self.status.value
entry_dict["excluded_dates"] = (
sorted(self.excluded_dates) if len(self.excluded_dates) > 0 else None
)
yaml_dict = {
self.entry_date: {
name: value for name, value in entry_dict.items() if value is not None
}
}
return yaml.dump(
yaml_dict,
sort_keys=False,
)