in s3transfer/manager.py [0:0]
def __init__(self, client, config=None, osutil=None, executor_cls=None):
"""A transfer manager interface for Amazon S3
:param client: Client to be used by the manager
:param config: TransferConfig to associate specific configurations
:param osutil: OSUtils object to use for os-related behavior when
using with transfer manager.
:type executor_cls: s3transfer.futures.BaseExecutor
:param executor_cls: The class of executor to use with the transfer
manager. By default, concurrent.futures.ThreadPoolExecutor is used.
"""
self._client = client
self._config = config
if config is None:
self._config = TransferConfig()
self._osutil = osutil
if osutil is None:
self._osutil = OSUtils()
self._coordinator_controller = TransferCoordinatorController()
# A counter to create unique id's for each transfer submitted.
self._id_counter = 0
# The executor responsible for making S3 API transfer requests
self._request_executor = BoundedExecutor(
max_size=self._config.max_request_queue_size,
max_num_threads=self._config.max_request_concurrency,
tag_semaphores={
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
self._config.max_in_memory_upload_chunks
),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks
),
},
executor_cls=executor_cls,
)
# The executor responsible for submitting the necessary tasks to
# perform the desired transfer
self._submission_executor = BoundedExecutor(
max_size=self._config.max_submission_queue_size,
max_num_threads=self._config.max_submission_concurrency,
executor_cls=executor_cls,
)
# There is one thread available for writing to disk. It will handle
# downloads for all files.
self._io_executor = BoundedExecutor(
max_size=self._config.max_io_queue_size,
max_num_threads=1,
executor_cls=executor_cls,
)
# The component responsible for limiting bandwidth usage if it
# is configured.
self._bandwidth_limiter = None
if self._config.max_bandwidth is not None:
logger.debug(
'Setting max_bandwidth to %s', self._config.max_bandwidth
)
leaky_bucket = LeakyBucket(self._config.max_bandwidth)
self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
self._register_handlers()