in s3transfer/tasks.py [0:0]
def _main(self, transfer_future, **kwargs):
"""
:type transfer_future: s3transfer.futures.TransferFuture
:param transfer_future: The transfer future associated with the
transfer request that tasks are being submitted for
:param kwargs: Any additional kwargs that you may want to pass
to the _submit() method
"""
try:
self._transfer_coordinator.set_status_to_queued()
# Before submitting any tasks, run all of the on_queued callbacks
on_queued_callbacks = get_callbacks(transfer_future, 'queued')
for on_queued_callback in on_queued_callbacks:
on_queued_callback()
# Once callbacks have been ran set the status to running.
self._transfer_coordinator.set_status_to_running()
# Call the submit method to start submitting tasks to execute the
# transfer.
self._submit(transfer_future=transfer_future, **kwargs)
except BaseException as e:
# If there was an exception raised during the submission of task
# there is a chance that the final task that signals if a transfer
# is done and too run the cleanup may never have been submitted in
# the first place so we need to account accordingly.
#
# Note that BaseException is caught, instead of Exception, because
# for some implementations of executors, specifically the serial
# implementation, the SubmissionTask is directly exposed to
# KeyboardInterupts and so needs to cleanup and signal done
# for those as well.
# Set the exception, that caused the process to fail.
self._log_and_set_exception(e)
# Wait for all possibly associated futures that may have spawned
# from this submission task have finished before we announce the
# transfer done.
self._wait_for_all_submitted_futures_to_complete()
# Announce the transfer as done, which will run any cleanups
# and done callbacks as well.
self._transfer_coordinator.announce_done()