probe_scraper/remote_storage.py (167 lines of code) (raw):
import fnmatch
import gzip
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Optional, Tuple, Union
from .exc import ProbeScraperServerError
TEXT_HTML = "text/html"
APPLICATION_JSON = "application/json"
INDEX_HTML = "index.html"
def _call(args: List[str]):
process = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
if process.returncode == 0:
print(process.stdout, end="")
else:
raise ProbeScraperServerError(
f"Command {args!r} returned non-zero exit status {process.returncode}: "
+ process.stdout
)
def _s3_sync(
src: Union[str, Path],
dst: Union[str, Path],
delete: bool = False,
exclude: Tuple[str, ...] = (),
acl: Optional[str] = None,
content_type: Optional[str] = None,
content_encoding: Optional[str] = None,
cache_control: Optional[str] = None,
):
# must use sync for dirs and cp for files
if isinstance(src, Path) and src.is_file():
# must upload files with cp
s3_cmd = "cp"
else:
s3_cmd = "sync"
_call(
["aws", "s3", s3_cmd, "--only-show-errors", str(src), str(dst)]
+ (["--delete"] if delete else [])
+ [
arg
for key, value in zip(
(
*("--exclude" for _ in exclude),
"--content-type",
"--content-encoding",
"--cache-control",
"--acl",
),
(
*exclude,
content_type,
content_encoding,
cache_control,
acl,
),
)
if value is not None
for arg in (key, value)
]
)
def _gcs_sync(
src: Union[str, Path],
dst: Union[str, Path],
delete: bool = False,
exclude: Tuple[str, ...] = (),
content_type: Optional[str] = None,
content_encoding: Optional[str] = None,
cache_control: Optional[str] = None,
acl: Optional[str] = None,
):
if isinstance(src, Path) and src.is_file():
# must upload files with cp
gsutil_cmd = ["cp"]
if delete:
raise ValueError("cannot delete when uploading a single file")
if exclude:
raise ValueError("cannot exclude when uploading a single file")
else:
gsutil_cmd = ["rsync", "-r"]
_call(
["gsutil", "-q", "-m"]
# -h flags are global and must appear before the rsync/cp command
+ [
arg
for header, value in zip(
["Content-Type", "Content-Encoding", "Cache-Control"],
[content_type, content_encoding, cache_control],
)
if value is not None
for arg in ("-h", f"{header}:{value}")
]
+ gsutil_cmd
# command specific options must appear before src and dst
+ (["-d"] if delete else [])
# translate excludes from glob to regex before passing to gsutil
+ [arg for item in exclude for arg in ("-x", fnmatch.translate(item))]
+ (["-a", acl] if acl is not None else [])
+ [str(src), str(dst)]
)
def _get_sync_function(remote: str):
if remote.startswith("s3://"):
return _s3_sync
elif remote.startswith("gs://"):
return _gcs_sync
else:
raise ValueError(
f"remote path must have scheme like s3:// or gs://, got: {remote!r}"
)
def remote_storage_pull(src: str, dst: Path, decompress: bool = False):
sync = _get_sync_function(src)
if sync is _gcs_sync:
# gsutil will decompress files
decompress = False
# prevent error from gsutil when dst and src do not exist
dst.mkdir(parents=True, exist_ok=True)
if decompress:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
sync(src, tmp_path)
for in_file in tmp_path.rglob("*"):
if not in_file.is_dir():
out_file = dst / in_file.relative_to(tmp_path)
out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(gzip.decompress(in_file.read_bytes()))
else:
sync(src, dst)
def remote_storage_push(src: Path, dst: str, compress: bool = False, **kwargs):
sync = _get_sync_function(dst)
if compress:
kwargs["content_encoding"] = "gzip"
if "exclude" in kwargs:
raise NotImplementedError("exclude is not supported while compressing")
# cloudfront is supposed to automatically gzip objects, but it won't do that
# if the object size is > 10 megabytes (https://webmasters.stackexchange.com/a/111734)
# which our files sometimes are. to work around this, as well as to support google
# cloud storage, we'll gzip the contents into a temporary directory, and upload that
# with a special content encoding
with TemporaryDirectory() as tmp_name:
tmp = Path(tmp_name)
if src.is_dir():
for in_file in src.rglob("*"):
if not in_file.is_dir():
out_file = tmp / in_file.relative_to(src)
out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(gzip.compress(in_file.read_bytes()))
index = tmp / INDEX_HTML
if index.exists():
# must be a tuple
kwargs["exclude"] = (INDEX_HTML,)
sync(
src=tmp,
dst=dst,
content_type=APPLICATION_JSON,
**kwargs,
)
if index.exists():
# cannot delete or exclude with a single file
kwargs["delete"] = False
kwargs["exclude"] = ()
sync(
src=index,
dst=dst,
content_type=TEXT_HTML,
**kwargs,
)
else:
tmp_file = tmp / src.name
tmp_file.write_bytes(gzip.compress(src.read_bytes()))
content_type = TEXT_HTML if src.name == INDEX_HTML else APPLICATION_JSON
sync(
src=tmp_file,
dst=dst,
content_type=content_type,
**kwargs,
)
else:
sync(src, dst, **kwargs)