in python/pyarrow/_s3fs.pyx [0:0]
def __init__(self, *, access_key=None, secret_key=None, session_token=None,
bint anonymous=False, region=None, request_timeout=None,
connect_timeout=None, scheme=None, endpoint_override=None,
bint background_writes=True, default_metadata=None,
role_arn=None, session_name=None, external_id=None,
load_frequency=900, proxy_options=None,
allow_delayed_open=False,
allow_bucket_creation=False, allow_bucket_deletion=False,
check_directory_existence_before_creation=False,
retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(
max_attempts=3),
force_virtual_addressing=False):
cdef:
optional[CS3Options] options
shared_ptr[CS3FileSystem] wrapped
# Need to do this before initializing `options` as the S3Options
# constructor has a debug check against use after S3 finalization.
ensure_s3_initialized()
if access_key is not None and secret_key is None:
raise ValueError(
'In order to initialize with explicit credentials both '
'access_key and secret_key must be provided, '
'`secret_key` is not set.'
)
elif access_key is None and secret_key is not None:
raise ValueError(
'In order to initialize with explicit credentials both '
'access_key and secret_key must be provided, '
'`access_key` is not set.'
)
elif session_token is not None and (access_key is None or
secret_key is None):
raise ValueError(
'In order to initialize a session with temporary credentials, '
'both secret_key and access_key must be provided in addition '
'to session_token.'
)
elif (access_key is not None or secret_key is not None):
if anonymous:
raise ValueError(
'Cannot pass anonymous=True together with access_key '
'and secret_key.')
if role_arn:
raise ValueError(
'Cannot provide role_arn with access_key and secret_key')
if session_token is None:
session_token = ""
options = CS3Options.FromAccessKey(
tobytes(access_key),
tobytes(secret_key),
tobytes(session_token)
)
elif anonymous:
if role_arn:
raise ValueError(
'Cannot provide role_arn with anonymous=True')
options = CS3Options.Anonymous()
elif role_arn:
if session_name is None:
session_name = ''
if external_id is None:
external_id = ''
options = CS3Options.FromAssumeRole(
tobytes(role_arn),
tobytes(session_name),
tobytes(external_id),
load_frequency
)
else:
options = CS3Options.Defaults()
if region is not None:
options.value().region = tobytes(region)
if request_timeout is not None:
options.value().request_timeout = request_timeout
if connect_timeout is not None:
options.value().connect_timeout = connect_timeout
if scheme is not None:
options.value().scheme = tobytes(scheme)
if endpoint_override is not None:
options.value().endpoint_override = tobytes(endpoint_override)
if background_writes is not None:
options.value().background_writes = background_writes
if default_metadata is not None:
if not isinstance(default_metadata, KeyValueMetadata):
default_metadata = KeyValueMetadata(default_metadata)
options.value().default_metadata = pyarrow_unwrap_metadata(
default_metadata)
if proxy_options is not None:
if isinstance(proxy_options, dict):
options.value().proxy_options.scheme = tobytes(
proxy_options["scheme"])
options.value().proxy_options.host = tobytes(
proxy_options["host"])
options.value().proxy_options.port = proxy_options["port"]
proxy_username = proxy_options.get("username", None)
if proxy_username:
options.value().proxy_options.username = tobytes(
proxy_username)
proxy_password = proxy_options.get("password", None)
if proxy_password:
options.value().proxy_options.password = tobytes(
proxy_password)
elif isinstance(proxy_options, str):
options.value().proxy_options = GetResultValue(
CS3ProxyOptions.FromUriString(tobytes(proxy_options)))
else:
raise TypeError(
"'proxy_options': expected 'dict' or 'str', "
f"got {type(proxy_options)} instead.")
options.value().allow_delayed_open = allow_delayed_open
options.value().allow_bucket_creation = allow_bucket_creation
options.value().allow_bucket_deletion = allow_bucket_deletion
options.value().check_directory_existence_before_creation = check_directory_existence_before_creation
options.value().force_virtual_addressing = force_virtual_addressing
if isinstance(retry_strategy, AwsStandardS3RetryStrategy):
options.value().retry_strategy = CS3RetryStrategy.GetAwsStandardRetryStrategy(
retry_strategy.max_attempts)
elif isinstance(retry_strategy, AwsDefaultS3RetryStrategy):
options.value().retry_strategy = CS3RetryStrategy.GetAwsDefaultRetryStrategy(
retry_strategy.max_attempts)
else:
raise ValueError(f'Invalid retry_strategy {retry_strategy!r}')
with nogil:
wrapped = GetResultValue(CS3FileSystem.Make(options.value()))
self.init(<shared_ptr[CFileSystem]> wrapped)