def _s3op_with_retries()

in metaflow/plugins/datatools/s3/s3.py [0:0]


    def _s3op_with_retries(self, mode, **options):
        from . import s3op

        # High level note on what this function does:
        #  - perform s3op (which calls s3op.py in a subprocess to parallelize the
        #    operation). Typically this operation has several inputs (for example,
        #    multiple files to get or put)
        #  - the result of this operation can be either:
        #    - a known permanent failure (access denied for example) in which case we
        #      return this failure.
        #    - a known transient failure (SlowDown for example) in which case we will
        #      retry *only* the inputs that have this transient failure.
        #    - an unknown failure (something went wrong but we cannot say if it was
        #      a known permanent failure or something else). In this case, we assume
        #      it's a transient failure and retry only those inputs (same as above).
        #
        # NOTES(npow): 2025-05-13
        # Previously, this code would also retry the fatal failures, including no_progress
        # and unknown failures, from the beginning. This is not ideal because:
        # 1. Fatal errors are not supposed to be retried.
        # 2. Retrying from the beginning does not improve the situation, and is
        #    wasteful since we have already uploaded some files.
        # 3. The number of transient errors is far more than fatal errors, so we
        #    can be optimistic and assume the unknown errors are transient.
        cmdline = [sys.executable, os.path.abspath(s3op.__file__), mode]
        recursive_get = False
        for key, value in options.items():
            key = key.replace("_", "-")
            if isinstance(value, bool):
                if value:
                    if mode == "get" and key == "recursive":
                        # We make a note of this because for transient retries, we
                        # don't pass the recursive flag since we already did all the
                        # listing we needed
                        recursive_get = True
                    else:
                        cmdline.append("--%s" % key)
                else:
                    cmdline.append("--no-%s" % key)
            elif key == "inputs":
                base_input_filename = value
            else:
                cmdline.extend(("--%s" % key, value))
        if self._s3_role is not None:
            cmdline.extend(("--s3role", self._s3_role))
        if self._s3_session_vars is not None:
            cmdline.extend(("--s3sessionvars", json.dumps(self._s3_session_vars)))
        if self._s3_client_params is not None:
            cmdline.extend(("--s3clientparams", json.dumps(self._s3_client_params)))

        def _inject_failure_rate():
            # list mode does not do retries on transient failures (there is no
            # SlowDown handling) so we never inject a failure rate
            if mode == "list":
                return 0
            # Otherwise, we cap the failure rate at 90%
            return min(90, self._s3_inject_failures)

        transient_retry_count = 0  # Number of transient retries (per top-level retry)
        inject_failures = _inject_failure_rate()
        out_lines = []  # List to contain the lines returned by _s3op_with_retries
        pending_retries = (
            []
        )  # Inputs that need to be retried due to a transient failure
        loop_count = 0
        last_ok_count = 0  # Number of inputs that were successful in the last try
        total_ok_count = 0  # Total number of OK inputs

        def _reset():
            nonlocal transient_retry_count, inject_failures, out_lines, pending_retries
            nonlocal loop_count, last_ok_count, total_ok_count
            transient_retry_count = 0
            inject_failures = _inject_failure_rate()
            if mode != "put":
                # For put, even after retries, we keep around whatever we already
                # uploaded. This is because uploading with overwrite=False is not
                # an idempotent operation and so some files could be uploaded during
                # the first try which we should report back.
                out_lines = []
            pending_retries = []
            loop_count = 0
            last_ok_count = 0
            total_ok_count = 0  # Reset to zero even if we keep out_lines

        def _update_out_lines(out_lines, ok_lines, resize=False):
            if resize:
                # This is the first time around; we make the list big enough. Typically,
                # there is nothing in out_lines but in some cases (a retry after a
                # partial result), there may be stuff in it
                out_lines.extend([None] * (len(ok_lines) - len(out_lines)))
            for l in ok_lines:
                idx, rest = l.split(b" ", maxsplit=1)
                if rest.decode(encoding="utf-8") != TRANSIENT_RETRY_LINE_CONTENT:
                    # Update the proper location in the out_lines array; we maintain
                    # position as if transient retries did not exist. This
                    # makes sure that order is respected even in the presence of
                    # transient retries.
                    out_lines[int(idx.decode(encoding="utf-8"))] = rest

        def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
            # NOTE: Make sure to update pending_retries and out_lines in place
            addl_cmdline = []
            if len(pending_retries) == 0 and recursive_get:
                # First time around (or after a fatal failure)
                addl_cmdline = ["--recursive"]
            with NamedTemporaryFile(
                dir=self._tmpdir,
                mode="wb+",
                delete=not debug.s3client,
                prefix="metaflow.s3op.stderr.",
            ) as stderr:
                with NamedTemporaryFile(
                    dir=self._tmpdir,
                    mode="wb",
                    delete=not debug.s3client,
                    prefix="metaflow.s3op.transientretry.",
                ) as tmp_input:
                    if len(pending_retries) > 0:
                        # We try a little bit more than the previous success (to still
                        # be aggressive but not too much). If there is a lot of
                        # transient errors and we are having issues pushing through
                        # things, this will shrink more and more until we are doing a
                        # single operation at a time. If things start going better, it
                        # will increase by 20% every round.
                        #
                        # If we made no progress (last_ok_count == 0) we retry at most
                        # 2*S3_WORKER_COUNT from whatever is left in `pending_retries`
                        max_count = min(
                            int(last_ok_count * 1.2), len(pending_retries)
                        ) or min(2 * S3_WORKER_COUNT, len(pending_retries))
                        tmp_input.writelines(pending_retries[:max_count])
                        tmp_input.flush()
                        debug.s3client_exec(
                            "Have %d pending; succeeded in %d => trying for %d and "
                            "leaving %d for the next round"
                            % (
                                len(pending_retries),
                                last_ok_count,
                                max_count,
                                len(pending_retries) - max_count,
                            )
                        )
                        del pending_retries[:max_count]

                        input_filename = tmp_input.name
                    else:
                        input_filename = base_input_filename

                    addl_cmdline.extend(["--inputs", input_filename])

                    # Check if we want to inject failures (for testing)
                    if inject_failures > 0:
                        addl_cmdline.extend(["--inject-failure", str(inject_failures)])
                        # Logic here is to have higher and lower failure rates to try to
                        # exercise as much of the code as possible. The failure rate
                        # trends towards 0.
                        if loop_count % 2 == 0:
                            inject_failures = int(inject_failures / 3)
                        else:
                            # We cap at 90 (and not 100) for injection of failures to
                            # reduce the likelihood of having flaky test. If the
                            # failure injection rate is too high, this can cause actual
                            # retries more often and then lead to too many actual
                            # retries
                            inject_failures = min(90, int(inject_failures * 1.5))
                    try:
                        debug.s3client_exec(cmdline + addl_cmdline)
                        # Run the operation.
                        env = os.environ.copy()
                        tracing.inject_tracing_vars(env)
                        env["METAFLOW_ESCAPE_HATCH_WARNING"] = "False"
                        stdout = subprocess.check_output(
                            cmdline + addl_cmdline,
                            cwd=self._tmpdir,
                            stderr=stderr.file,
                            env=env,
                        )
                        # Here we did not have any error -- transient or otherwise.
                        ok_lines = stdout.splitlines()
                        _update_out_lines(out_lines, ok_lines, resize=loop_count == 0)
                        return (len(ok_lines), 0, inject_failures, None)
                    except subprocess.CalledProcessError as ex:
                        if ex.returncode == s3op.ERROR_TRANSIENT:
                            # In this special case, we failed transiently on *some* of
                            # the files but not necessarily all. This is typically
                            # caused by limits on the number of operations that can
                            # occur per second or some other temporary limitation.
                            # We will retry only those that we failed on and we will not
                            # count this as a retry *unless* we are making no forward
                            # progress. In effect, we consider that as long as *some*
                            # operations are going through, we should just keep going as
                            # if it was a single operation.
                            ok_lines = ex.stdout.splitlines()
                            stderr.seek(0)
                            do_output = False
                            retry_lines = []
                            for l in stderr:
                                if do_output:
                                    retry_lines.append(l)
                                    continue
                                if (
                                    l.decode(encoding="utf-8")
                                    == "%s\n" % TRANSIENT_RETRY_START_LINE
                                ):
                                    # Look for a special marker as the start of the
                                    # "failed inputs that need to be retried"
                                    do_output = True
                            stderr.seek(0)
                            if do_output is False:
                                return (
                                    0,
                                    0,
                                    inject_failures,
                                    "Could not find inputs to retry",
                                )
                            else:
                                _update_out_lines(
                                    out_lines, ok_lines, resize=loop_count == 0
                                )
                                pending_retries.extend(retry_lines)

                                return (
                                    len(ok_lines),
                                    len(retry_lines),
                                    inject_failures,
                                    None,
                                )

                        # Here, this is a "normal" failure that we need to send back up.
                        # These failures are not retried.
                        stderr.seek(0)
                        err_out = stderr.read().decode("utf-8", errors="replace")
                        stderr.seek(0)
                        if ex.returncode == s3op.ERROR_URL_NOT_FOUND:
                            raise MetaflowS3NotFound(err_out)
                        elif ex.returncode == s3op.ERROR_URL_ACCESS_DENIED:
                            raise MetaflowS3AccessDenied(err_out)
                        elif ex.returncode == s3op.ERROR_INVALID_RANGE:
                            raise MetaflowS3InvalidRange(err_out)

                        # Here, this is some other error that we will retry. We still
                        # update the successful lines
                        ok_lines = ex.stdout.splitlines()
                        _update_out_lines(out_lines, ok_lines, resize=loop_count == 0)
                        return 0, 0, inject_failures, err_out

        while transient_retry_count <= S3_TRANSIENT_RETRY_COUNT:
            (
                last_ok_count,
                last_retry_count,
                inject_failures,
                err_out,
            ) = try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures)
            if err_out:
                break
            if last_retry_count != 0:
                # During our last try, we did not manage to process everything we wanted
                # due to a transient failure so we try again.
                transient_retry_count += 1
                total_ok_count += last_ok_count
                print(
                    "Transient S3 failure (attempt #%d) -- total success: %d, "
                    "last attempt %d/%d -- remaining: %d"
                    % (
                        transient_retry_count,
                        total_ok_count,
                        last_ok_count,
                        last_ok_count + last_retry_count,
                        len(pending_retries),
                    )
                )
                if inject_failures == 0:
                    # Don't sleep when we are "faking" the failures
                    self._jitter_sleep(transient_retry_count)

            loop_count += 1
            # If we have no more things to try, we break out of the loop.
            if len(pending_retries) == 0:
                break

        # At this point, we check out_lines; strip None which can happen for puts that
        # didn't upload files
        return [o for o in out_lines if o is not None], err_out