def worker()

in metaflow/plugins/datatools/s3/s3op.py [0:0]


def worker(result_file_name, queue, mode, s3config):
    # Interpret mode, it can either be a single op or something like
    # info_download or info_upload which implies:
    #  - for download: we need to return the information as well
    #  - for upload: we need to not overwrite the file if it exists
    modes = mode.split("_")
    pre_op_info = False
    if len(modes) > 1:
        pre_op_info = True
        mode = modes[1]
    else:
        mode = modes[0]

    def op_info(url):
        try:
            head = s3.head_object(Bucket=url.bucket, Key=url.path)
            to_return = {
                "error": None,
                "size": head["ContentLength"],
                "content_type": head["ContentType"],
                "encryption": head.get("ServerSideEncryption"),
                "metadata": head["Metadata"],
                "last_modified": get_timestamp(head["LastModified"]),
            }
        except client_error as err:
            error_code = normalize_client_error(err)
            if error_code == 404:
                to_return = {"error": ERROR_URL_NOT_FOUND, "raise_error": err}
            elif error_code == 403:
                to_return = {"error": ERROR_URL_ACCESS_DENIED, "raise_error": err}
            elif error_code == 416:
                to_return = {"error": ERROR_INVALID_RANGE, "raise_error": err}
            elif error_code in (500, 502, 503, 504):
                to_return = {"error": ERROR_TRANSIENT, "raise_error": err}
            else:
                to_return = {"error": error_code, "raise_error": err}
        return to_return

    with open(result_file_name, "w") as result_file:
        try:
            from metaflow.plugins.datatools.s3.s3util import get_s3_client

            s3, client_error = get_s3_client(
                s3_role_arn=s3config.role,
                s3_session_vars=s3config.session_vars,
                s3_client_params=s3config.client_params,
            )
            while True:
                url, idx = queue.get()
                if url is None:
                    break
                if mode == "info":
                    result = op_info(url)
                    orig_error = result.get("raise_error", None)
                    if orig_error:
                        del result["raise_error"]
                    with open(url.local, "w") as f:
                        json.dump(result, f)
                    result_file.write(
                        "%d %d\n"
                        % (idx, -1 * result["error"] if orig_error else result["size"])
                    )
                elif mode == "download":
                    tmp = NamedTemporaryFile(dir=".", mode="wb", delete=False)
                    try:
                        try:
                            if url.range:
                                resp = s3.get_object(
                                    Bucket=url.bucket, Key=url.path, Range=url.range
                                )
                                range_result = resp["ContentRange"]
                                range_result_match = RANGE_MATCH.match(range_result)
                                if range_result_match is None:
                                    raise RuntimeError(
                                        "Wrong format for ContentRange: %s"
                                        % str(range_result)
                                    )
                                range_result = {
                                    x: int(range_result_match.group(x))
                                    for x in ["total", "start", "end"]
                                }
                            else:
                                resp = s3.get_object(Bucket=url.bucket, Key=url.path)
                                range_result = None
                            sz = resp["ContentLength"]
                            if range_result is None:
                                range_result = {"total": sz, "start": 0, "end": sz - 1}
                            if not url.range and sz > DOWNLOAD_FILE_THRESHOLD:
                                # In this case, it is more efficient to use download_file as it
                                # will download multiple parts in parallel (it does it after
                                # multipart_threshold)
                                s3.download_file(url.bucket, url.path, tmp.name)
                            else:
                                read_in_chunks(
                                    tmp, resp["Body"], sz, DOWNLOAD_MAX_CHUNK
                                )
                            tmp.close()
                            os.rename(tmp.name, url.local)
                        except client_error as err:
                            tmp.close()
                            os.unlink(tmp.name)
                            handle_client_error(err, idx, result_file)
                            continue
                        except RetriesExceededError as e:
                            tmp.close()
                            os.unlink(tmp.name)
                            err = convert_to_client_error(e)
                            handle_client_error(err, idx, result_file)
                            continue
                        except OSError as e:
                            tmp.close()
                            os.unlink(tmp.name)
                            if e.errno == errno.ENOSPC:
                                result_file.write(
                                    "%d %d\n" % (idx, -ERROR_OUT_OF_DISK_SPACE)
                                )
                            else:
                                result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
                            result_file.flush()
                            continue
                    except (SSLError, Exception) as e:
                        tmp.close()
                        os.unlink(tmp.name)
                        # assume anything else is transient
                        result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
                        result_file.flush()
                        continue
                    # If we need the metadata, get it and write it out
                    if pre_op_info:
                        with open("%s_meta" % url.local, mode="w") as f:
                            # Get range information

                            args = {
                                "size": resp["ContentLength"],
                                "range_result": range_result,
                            }
                            if resp["ContentType"]:
                                args["content_type"] = resp["ContentType"]
                            if resp["Metadata"] is not None:
                                args["metadata"] = resp["Metadata"]
                            if resp.get("ServerSideEncryption") is not None:
                                args["encryption"] = resp["ServerSideEncryption"]
                            if resp["LastModified"]:
                                args["last_modified"] = get_timestamp(
                                    resp["LastModified"]
                                )
                            json.dump(args, f)
                    # Finally, we push out the size to the result_pipe since
                    # the size is used for verification and other purposes, and
                    # we want to avoid file operations for this simple process
                    result_file.write("%d %d\n" % (idx, resp["ContentLength"]))
                else:
                    # This is upload, if we have a pre_op, it means we do not
                    # want to overwrite
                    do_upload = False
                    if pre_op_info:
                        result_info = op_info(url)
                        if result_info["error"] == ERROR_URL_NOT_FOUND:
                            # We only upload if the file is not found
                            do_upload = True
                    else:
                        # No pre-op so we upload
                        do_upload = True
                    if do_upload:
                        extra = None
                        if url.content_type or url.metadata or url.encryption:
                            extra = {}
                            if url.content_type:
                                extra["ContentType"] = url.content_type
                            if url.metadata is not None:
                                extra["Metadata"] = url.metadata
                            if url.encryption is not None:
                                extra["ServerSideEncryption"] = url.encryption
                        try:
                            try:
                                s3.upload_file(
                                    url.local, url.bucket, url.path, ExtraArgs=extra
                                )
                                # We indicate that the file was uploaded
                                result_file.write("%d %d\n" % (idx, 0))
                            except client_error as err:
                                # Shouldn't get here, but just in case.
                                # Internally, botocore catches ClientError and returns a S3UploadFailedError.
                                # See https://github.com/boto/boto3/blob/develop/boto3/s3/transfer.py#L377
                                handle_client_error(err, idx, result_file)
                                continue
                            except S3UploadFailedError as e:
                                err = convert_to_client_error(e)
                                handle_client_error(err, idx, result_file)
                                continue
                        except (SSLError, Exception) as e:
                            # assume anything else is transient
                            result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
                            result_file.flush()
                            continue
        except:
            traceback.print_exc()
            result_file.flush()
            sys.exit(ERROR_WORKER_EXCEPTION)