in metaflow/plugins/datatools/s3/s3op.py [0:0]
def worker(result_file_name, queue, mode, s3config):
# Interpret mode, it can either be a single op or something like
# info_download or info_upload which implies:
# - for download: we need to return the information as well
# - for upload: we need to not overwrite the file if it exists
modes = mode.split("_")
pre_op_info = False
if len(modes) > 1:
pre_op_info = True
mode = modes[1]
else:
mode = modes[0]
def op_info(url):
try:
head = s3.head_object(Bucket=url.bucket, Key=url.path)
to_return = {
"error": None,
"size": head["ContentLength"],
"content_type": head["ContentType"],
"encryption": head.get("ServerSideEncryption"),
"metadata": head["Metadata"],
"last_modified": get_timestamp(head["LastModified"]),
}
except client_error as err:
error_code = normalize_client_error(err)
if error_code == 404:
to_return = {"error": ERROR_URL_NOT_FOUND, "raise_error": err}
elif error_code == 403:
to_return = {"error": ERROR_URL_ACCESS_DENIED, "raise_error": err}
elif error_code == 416:
to_return = {"error": ERROR_INVALID_RANGE, "raise_error": err}
elif error_code in (500, 502, 503, 504):
to_return = {"error": ERROR_TRANSIENT, "raise_error": err}
else:
to_return = {"error": error_code, "raise_error": err}
return to_return
with open(result_file_name, "w") as result_file:
try:
from metaflow.plugins.datatools.s3.s3util import get_s3_client
s3, client_error = get_s3_client(
s3_role_arn=s3config.role,
s3_session_vars=s3config.session_vars,
s3_client_params=s3config.client_params,
)
while True:
url, idx = queue.get()
if url is None:
break
if mode == "info":
result = op_info(url)
orig_error = result.get("raise_error", None)
if orig_error:
del result["raise_error"]
with open(url.local, "w") as f:
json.dump(result, f)
result_file.write(
"%d %d\n"
% (idx, -1 * result["error"] if orig_error else result["size"])
)
elif mode == "download":
tmp = NamedTemporaryFile(dir=".", mode="wb", delete=False)
try:
try:
if url.range:
resp = s3.get_object(
Bucket=url.bucket, Key=url.path, Range=url.range
)
range_result = resp["ContentRange"]
range_result_match = RANGE_MATCH.match(range_result)
if range_result_match is None:
raise RuntimeError(
"Wrong format for ContentRange: %s"
% str(range_result)
)
range_result = {
x: int(range_result_match.group(x))
for x in ["total", "start", "end"]
}
else:
resp = s3.get_object(Bucket=url.bucket, Key=url.path)
range_result = None
sz = resp["ContentLength"]
if range_result is None:
range_result = {"total": sz, "start": 0, "end": sz - 1}
if not url.range and sz > DOWNLOAD_FILE_THRESHOLD:
# In this case, it is more efficient to use download_file as it
# will download multiple parts in parallel (it does it after
# multipart_threshold)
s3.download_file(url.bucket, url.path, tmp.name)
else:
read_in_chunks(
tmp, resp["Body"], sz, DOWNLOAD_MAX_CHUNK
)
tmp.close()
os.rename(tmp.name, url.local)
except client_error as err:
tmp.close()
os.unlink(tmp.name)
handle_client_error(err, idx, result_file)
continue
except RetriesExceededError as e:
tmp.close()
os.unlink(tmp.name)
err = convert_to_client_error(e)
handle_client_error(err, idx, result_file)
continue
except OSError as e:
tmp.close()
os.unlink(tmp.name)
if e.errno == errno.ENOSPC:
result_file.write(
"%d %d\n" % (idx, -ERROR_OUT_OF_DISK_SPACE)
)
else:
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
result_file.flush()
continue
except (SSLError, Exception) as e:
tmp.close()
os.unlink(tmp.name)
# assume anything else is transient
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
result_file.flush()
continue
# If we need the metadata, get it and write it out
if pre_op_info:
with open("%s_meta" % url.local, mode="w") as f:
# Get range information
args = {
"size": resp["ContentLength"],
"range_result": range_result,
}
if resp["ContentType"]:
args["content_type"] = resp["ContentType"]
if resp["Metadata"] is not None:
args["metadata"] = resp["Metadata"]
if resp.get("ServerSideEncryption") is not None:
args["encryption"] = resp["ServerSideEncryption"]
if resp["LastModified"]:
args["last_modified"] = get_timestamp(
resp["LastModified"]
)
json.dump(args, f)
# Finally, we push out the size to the result_pipe since
# the size is used for verification and other purposes, and
# we want to avoid file operations for this simple process
result_file.write("%d %d\n" % (idx, resp["ContentLength"]))
else:
# This is upload, if we have a pre_op, it means we do not
# want to overwrite
do_upload = False
if pre_op_info:
result_info = op_info(url)
if result_info["error"] == ERROR_URL_NOT_FOUND:
# We only upload if the file is not found
do_upload = True
else:
# No pre-op so we upload
do_upload = True
if do_upload:
extra = None
if url.content_type or url.metadata or url.encryption:
extra = {}
if url.content_type:
extra["ContentType"] = url.content_type
if url.metadata is not None:
extra["Metadata"] = url.metadata
if url.encryption is not None:
extra["ServerSideEncryption"] = url.encryption
try:
try:
s3.upload_file(
url.local, url.bucket, url.path, ExtraArgs=extra
)
# We indicate that the file was uploaded
result_file.write("%d %d\n" % (idx, 0))
except client_error as err:
# Shouldn't get here, but just in case.
# Internally, botocore catches ClientError and returns a S3UploadFailedError.
# See https://github.com/boto/boto3/blob/develop/boto3/s3/transfer.py#L377
handle_client_error(err, idx, result_file)
continue
except S3UploadFailedError as e:
err = convert_to_client_error(e)
handle_client_error(err, idx, result_file)
continue
except (SSLError, Exception) as e:
# assume anything else is transient
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
result_file.flush()
continue
except:
traceback.print_exc()
result_file.flush()
sys.exit(ERROR_WORKER_EXCEPTION)