bigquery_etl/query_scheduling/utils.py (66 lines of code) (raw):
"""Utility functions for scheduling queries."""
import re
from datetime import datetime
TIMEDELTA_RE = re.compile(
r"^-?((?P<hours>\d+)h)?((?P<minutes>\d+)m)?((?P<seconds>\d+)s)?$"
)
def is_timedelta_string(s):
"""
Check whether a provided string is in a timedelta format.
Timedeltas in configs are specified like: 1h, 30m, 1h15m, ...
"""
return TIMEDELTA_RE.match(s)
def validate_timedelta_string(s):
"""Raise an error if the provided string is not in a valid timedelta format."""
if not is_timedelta_string(s):
raise ValueError(
f"Invalid timedelta value '{s}'."
" Timedeltas should be specified like '1h', '45m', '10s', '1h30m', etc."
)
def is_date_string(s):
"""Check whether a string is a valid date string formatted like YYYY-MM-DD."""
try:
datetime.strptime(s, "%Y-%m-%d")
except ValueError:
return False
return True
def is_email(s):
"""Check whether the provided string is a valid email address."""
# https://stackoverflow.com/questions/8022530/how-to-check-for-valid-email-address
return re.match(r"[^@]+@[^@]+\.[^@]+", s)
def is_github_identity(s):
"""Check if the given string matches the format of a Github identity."""
return re.match(r"[@mozilla]+\/[a-zA-Z0-9]+", s)
def is_email_or_github_identity(s):
"""Check if the given string is either an email or a Github identity."""
return is_email(s) or is_github_identity(s)
DAG_NAME_RE = re.compile("^(private_)?bqetl_.+$")
def is_valid_dag_name(name):
"""Check whether the DAG name is valid."""
return DAG_NAME_RE.match(name)
# https://stackoverflow.com/questions/14203122/create-a-regular-expression-for-cron-statement
SCHEDULE_INTERVAL_RE = re.compile(
r"^(once|hourly|daily|weekly|monthly|yearly|"
r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*/\d+|\*) ?){5,7})|"
r"((\d+h)?(\d+m)?(\d+s)?))$"
)
def is_schedule_interval(interval):
"""Check whether the provided string is a valid schedule interval."""
return SCHEDULE_INTERVAL_RE.match(interval)
def schedule_interval_delta(schedule_interval1, schedule_interval2):
"""Return the time delta between two schedule intervals as timedelta string."""
if not is_schedule_interval(schedule_interval1) or not is_schedule_interval(
schedule_interval2
):
return None
aliases = {
"yearly": "0 0 1 1 *",
"monthly": "0 0 1 * *",
"weekly": "0 0 * * 0",
"daily": "0 0 * * *",
"hourly": "0 * * * *",
"once": "* * * * *",
}
cron_regex = re.compile(
r"^(?P<minutes>\d+) (?P<hours>\d+) (?P<day>\d+) (?P<month>\d+) (?P<dow>\d+)$"
)
if schedule_interval1 in aliases:
schedule_interval1 = aliases[schedule_interval1]
if schedule_interval2 in aliases:
schedule_interval2 = aliases[schedule_interval2]
si1 = schedule_interval1.replace("*", "0")
si2 = schedule_interval2.replace("*", "0")
if cron_regex.match(si2) is None or cron_regex.match(si1) is None:
return None
parts1 = cron_regex.match(si1).groupdict()
parts2 = cron_regex.match(si2).groupdict()
# delta in seconds
delta = 0
delta += (int(parts2["hours"]) - int(parts1["hours"])) * 60 * 60
delta += (int(parts2["minutes"]) - int(parts1["minutes"])) * 60
delta += (int(parts2["day"]) - int(parts1["day"])) * 24 * 60 * 60
# todo handle month and day of week
return f"{delta}s"