in metaflow/plugins/datatools/s3/s3.py [0:0]
def _s3op_with_retries(self, mode, **options):
from . import s3op
# High level note on what this function does:
# - perform s3op (which calls s3op.py in a subprocess to parallelize the
# operation). Typically this operation has several inputs (for example,
# multiple files to get or put)
# - the result of this operation can be either:
# - a known permanent failure (access denied for example) in which case we
# return this failure.
# - a known transient failure (SlowDown for example) in which case we will
# retry *only* the inputs that have this transient failure.
# - an unknown failure (something went wrong but we cannot say if it was
# a known permanent failure or something else). In this case, we assume
# it's a transient failure and retry only those inputs (same as above).
#
# NOTES(npow): 2025-05-13
# Previously, this code would also retry the fatal failures, including no_progress
# and unknown failures, from the beginning. This is not ideal because:
# 1. Fatal errors are not supposed to be retried.
# 2. Retrying from the beginning does not improve the situation, and is
# wasteful since we have already uploaded some files.
# 3. The number of transient errors is far more than fatal errors, so we
# can be optimistic and assume the unknown errors are transient.
cmdline = [sys.executable, os.path.abspath(s3op.__file__), mode]
recursive_get = False
for key, value in options.items():
key = key.replace("_", "-")
if isinstance(value, bool):
if value:
if mode == "get" and key == "recursive":
# We make a note of this because for transient retries, we
# don't pass the recursive flag since we already did all the
# listing we needed
recursive_get = True
else:
cmdline.append("--%s" % key)
else:
cmdline.append("--no-%s" % key)
elif key == "inputs":
base_input_filename = value
else:
cmdline.extend(("--%s" % key, value))
if self._s3_role is not None:
cmdline.extend(("--s3role", self._s3_role))
if self._s3_session_vars is not None:
cmdline.extend(("--s3sessionvars", json.dumps(self._s3_session_vars)))
if self._s3_client_params is not None:
cmdline.extend(("--s3clientparams", json.dumps(self._s3_client_params)))
def _inject_failure_rate():
# list mode does not do retries on transient failures (there is no
# SlowDown handling) so we never inject a failure rate
if mode == "list":
return 0
# Otherwise, we cap the failure rate at 90%
return min(90, self._s3_inject_failures)
transient_retry_count = 0 # Number of transient retries (per top-level retry)
inject_failures = _inject_failure_rate()
out_lines = [] # List to contain the lines returned by _s3op_with_retries
pending_retries = (
[]
) # Inputs that need to be retried due to a transient failure
loop_count = 0
last_ok_count = 0 # Number of inputs that were successful in the last try
total_ok_count = 0 # Total number of OK inputs
def _reset():
nonlocal transient_retry_count, inject_failures, out_lines, pending_retries
nonlocal loop_count, last_ok_count, total_ok_count
transient_retry_count = 0
inject_failures = _inject_failure_rate()
if mode != "put":
# For put, even after retries, we keep around whatever we already
# uploaded. This is because uploading with overwrite=False is not
# an idempotent operation and so some files could be uploaded during
# the first try which we should report back.
out_lines = []
pending_retries = []
loop_count = 0
last_ok_count = 0
total_ok_count = 0 # Reset to zero even if we keep out_lines
def _update_out_lines(out_lines, ok_lines, resize=False):
if resize:
# This is the first time around; we make the list big enough. Typically,
# there is nothing in out_lines but in some cases (a retry after a
# partial result), there may be stuff in it
out_lines.extend([None] * (len(ok_lines) - len(out_lines)))
for l in ok_lines:
idx, rest = l.split(b" ", maxsplit=1)
if rest.decode(encoding="utf-8") != TRANSIENT_RETRY_LINE_CONTENT:
# Update the proper location in the out_lines array; we maintain
# position as if transient retries did not exist. This
# makes sure that order is respected even in the presence of
# transient retries.
out_lines[int(idx.decode(encoding="utf-8"))] = rest
def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
# NOTE: Make sure to update pending_retries and out_lines in place
addl_cmdline = []
if len(pending_retries) == 0 and recursive_get:
# First time around (or after a fatal failure)
addl_cmdline = ["--recursive"]
with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb+",
delete=not debug.s3client,
prefix="metaflow.s3op.stderr.",
) as stderr:
with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb",
delete=not debug.s3client,
prefix="metaflow.s3op.transientretry.",
) as tmp_input:
if len(pending_retries) > 0:
# We try a little bit more than the previous success (to still
# be aggressive but not too much). If there is a lot of
# transient errors and we are having issues pushing through
# things, this will shrink more and more until we are doing a
# single operation at a time. If things start going better, it
# will increase by 20% every round.
#
# If we made no progress (last_ok_count == 0) we retry at most
# 2*S3_WORKER_COUNT from whatever is left in `pending_retries`
max_count = min(
int(last_ok_count * 1.2), len(pending_retries)
) or min(2 * S3_WORKER_COUNT, len(pending_retries))
tmp_input.writelines(pending_retries[:max_count])
tmp_input.flush()
debug.s3client_exec(
"Have %d pending; succeeded in %d => trying for %d and "
"leaving %d for the next round"
% (
len(pending_retries),
last_ok_count,
max_count,
len(pending_retries) - max_count,
)
)
del pending_retries[:max_count]
input_filename = tmp_input.name
else:
input_filename = base_input_filename
addl_cmdline.extend(["--inputs", input_filename])
# Check if we want to inject failures (for testing)
if inject_failures > 0:
addl_cmdline.extend(["--inject-failure", str(inject_failures)])
# Logic here is to have higher and lower failure rates to try to
# exercise as much of the code as possible. The failure rate
# trends towards 0.
if loop_count % 2 == 0:
inject_failures = int(inject_failures / 3)
else:
# We cap at 90 (and not 100) for injection of failures to
# reduce the likelihood of having flaky test. If the
# failure injection rate is too high, this can cause actual
# retries more often and then lead to too many actual
# retries
inject_failures = min(90, int(inject_failures * 1.5))
try:
debug.s3client_exec(cmdline + addl_cmdline)
# Run the operation.
env = os.environ.copy()
tracing.inject_tracing_vars(env)
env["METAFLOW_ESCAPE_HATCH_WARNING"] = "False"
stdout = subprocess.check_output(
cmdline + addl_cmdline,
cwd=self._tmpdir,
stderr=stderr.file,
env=env,
)
# Here we did not have any error -- transient or otherwise.
ok_lines = stdout.splitlines()
_update_out_lines(out_lines, ok_lines, resize=loop_count == 0)
return (len(ok_lines), 0, inject_failures, None)
except subprocess.CalledProcessError as ex:
if ex.returncode == s3op.ERROR_TRANSIENT:
# In this special case, we failed transiently on *some* of
# the files but not necessarily all. This is typically
# caused by limits on the number of operations that can
# occur per second or some other temporary limitation.
# We will retry only those that we failed on and we will not
# count this as a retry *unless* we are making no forward
# progress. In effect, we consider that as long as *some*
# operations are going through, we should just keep going as
# if it was a single operation.
ok_lines = ex.stdout.splitlines()
stderr.seek(0)
do_output = False
retry_lines = []
for l in stderr:
if do_output:
retry_lines.append(l)
continue
if (
l.decode(encoding="utf-8")
== "%s\n" % TRANSIENT_RETRY_START_LINE
):
# Look for a special marker as the start of the
# "failed inputs that need to be retried"
do_output = True
stderr.seek(0)
if do_output is False:
return (
0,
0,
inject_failures,
"Could not find inputs to retry",
)
else:
_update_out_lines(
out_lines, ok_lines, resize=loop_count == 0
)
pending_retries.extend(retry_lines)
return (
len(ok_lines),
len(retry_lines),
inject_failures,
None,
)
# Here, this is a "normal" failure that we need to send back up.
# These failures are not retried.
stderr.seek(0)
err_out = stderr.read().decode("utf-8", errors="replace")
stderr.seek(0)
if ex.returncode == s3op.ERROR_URL_NOT_FOUND:
raise MetaflowS3NotFound(err_out)
elif ex.returncode == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(err_out)
elif ex.returncode == s3op.ERROR_INVALID_RANGE:
raise MetaflowS3InvalidRange(err_out)
# Here, this is some other error that we will retry. We still
# update the successful lines
ok_lines = ex.stdout.splitlines()
_update_out_lines(out_lines, ok_lines, resize=loop_count == 0)
return 0, 0, inject_failures, err_out
while transient_retry_count <= S3_TRANSIENT_RETRY_COUNT:
(
last_ok_count,
last_retry_count,
inject_failures,
err_out,
) = try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures)
if err_out:
break
if last_retry_count != 0:
# During our last try, we did not manage to process everything we wanted
# due to a transient failure so we try again.
transient_retry_count += 1
total_ok_count += last_ok_count
print(
"Transient S3 failure (attempt #%d) -- total success: %d, "
"last attempt %d/%d -- remaining: %d"
% (
transient_retry_count,
total_ok_count,
last_ok_count,
last_ok_count + last_retry_count,
len(pending_retries),
)
)
if inject_failures == 0:
# Don't sleep when we are "faking" the failures
self._jitter_sleep(transient_retry_count)
loop_count += 1
# If we have no more things to try, we break out of the loop.
if len(pending_retries) == 0:
break
# At this point, we check out_lines; strip None which can happen for puts that
# didn't upload files
return [o for o in out_lines if o is not None], err_out