workflow/__init__.py (91 lines of code) (raw):
import glog as log
import os
import re
import shutil
import time
from datetime import datetime, timedelta
from google.api_core import exceptions, page_iterator
from google.cloud import storage
from pathlib import Path
kIdentifierFormat = re.compile(r"(\d{8}-\d+)/?")
kTestBucket = "local_test"
def get_test_dir(bucket_name):
if not bucket_name.startswith(kTestBucket + ":"):
raise ValueError(f"Expected bucket_name to start with '{kTestBucket}:`")
return Path(bucket_name[len(kTestBucket) + 1 :]) / "db"
class FileNotFoundException(exceptions.NotFound):
pass
def _item_to_value(iterator, item):
return item
def list_google_storage_directories(bucket_name, *, prefix=None):
if bucket_name.startswith(kTestBucket):
for _, dirs, _ in os.walk(get_test_dir(bucket_name)):
return dirs
extra_params = {"projection": "noAcl", "delimiter": "/"}
if prefix is not None:
if not prefix.endswith("/"):
prefix += "/"
extra_params["prefix"] = prefix
gcs = storage.Client()
path = "/b/" + bucket_name + "/o"
iterator = page_iterator.HTTPIterator(
client=gcs,
api_request=gcs._connection.api_request,
path=path,
items_key="prefixes",
item_to_value=_item_to_value,
extra_params=extra_params,
)
return [x for x in iterator]
def normalize_identifier(s):
"""The first part of the identifier is a date with no separators and is
obvious to sort. The second part is a number which is generally a
single digit, but in a degenerate case could end up with multiple, so
we pad it here.
"""
parts = s.rstrip("/").split("-")
return f"{parts[0]}{int(parts[1]):06d}"
def get_run_identifiers(bucket_name):
dirs = list_google_storage_directories(bucket_name)
identifiers = filter(lambda x: kIdentifierFormat.match(x), dirs)
identifiers = map(lambda x: kIdentifierFormat.match(x).group(1), identifiers)
return sorted(identifiers, key=normalize_identifier)
def google_cloud_file_exists(bucket_name, remote):
if bucket_name.startswith(kTestBucket):
return (Path(get_test_dir(bucket_name)) / remote).exists()
gcs = storage.Client()
bucket = gcs.get_bucket(bucket_name)
blob = storage.blob.Blob(remote, bucket)
return blob.exists()
def download_from_google_cloud_to_string(bucket_name, remote):
if bucket_name.startswith(kTestBucket):
return (Path(get_test_dir(bucket_name)) / remote).read_bytes()
gcs = storage.Client()
bucket = gcs.get_bucket(bucket_name)
blob = storage.blob.Blob(remote, bucket)
if not blob.exists():
raise FileNotFoundException(f"{remote} does not exist")
return blob.download_as_string()
def download_from_google_cloud(bucket_name, remote, local):
if bucket_name.startswith(kTestBucket):
shutil.copy(Path(get_test_dir(bucket_name)) / remote, local)
return
gcs = storage.Client()
bucket = gcs.get_bucket(bucket_name)
blob = storage.blob.Blob(remote, bucket)
if not blob.exists():
raise FileNotFoundException(f"{remote} does not exist")
with open(local, "wb") as file_obj:
blob.download_to_file(file_obj)
log.info(f"Downloaded {blob.public_url} to {local}")
def download_and_retry_from_google_cloud(
bucket_name, remote, local, *, timeout=timedelta(minutes=5)
):
time_start = datetime.now()
while True:
try:
return download_from_google_cloud(bucket_name, remote, local)
except FileNotFoundException as fnfe:
time_waiting = datetime.now() - time_start
if time_waiting >= timeout:
raise fnfe
log.warning(
f"File {remote} not found, retrying (wating={time_waiting}, "
+ f"deadline={timeout-time_waiting})"
)
time.sleep(30)