def _s3()

in pyiceberg/io/fsspec.py [0:0]


def _s3(properties: Properties) -> AbstractFileSystem:
    from s3fs import S3FileSystem

    client_kwargs = {
        "endpoint_url": properties.get(S3_ENDPOINT),
        "aws_access_key_id": get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
        "aws_secret_access_key": get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
        "aws_session_token": get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
        "region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),
    }
    config_kwargs = {}
    register_events: Dict[str, Callable[[Properties], None]] = {}

    if signer := properties.get("s3.signer"):
        logger.info("Loading signer %s", signer)
        if signer_func := SIGNERS.get(signer):
            signer_func_with_properties = partial(signer_func, properties)
            register_events["before-sign.s3"] = signer_func_with_properties

            # Disable the AWS Signer
            from botocore import UNSIGNED

            config_kwargs["signature_version"] = UNSIGNED
        else:
            raise ValueError(f"Signer not available: {signer}")

    if proxy_uri := properties.get(S3_PROXY_URI):
        config_kwargs["proxies"] = {"http": proxy_uri, "https": proxy_uri}

    if connect_timeout := properties.get(S3_CONNECT_TIMEOUT):
        config_kwargs["connect_timeout"] = float(connect_timeout)

    if request_timeout := properties.get(S3_REQUEST_TIMEOUT):
        config_kwargs["read_timeout"] = float(request_timeout)

    fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)

    for event_name, event_function in register_events.items():
        fs.s3.meta.events.register_last(event_name, event_function, unique_id=1925)

    return fs