in gslib/command.py [0:0]
def run(self):
num_tasks = 0
cur_task = None
last_task = None
task_estimation_threshold = None
seek_ahead_thread = None
seek_ahead_thread_cancel_event = None
seek_ahead_thread_considered = False
args = None
try:
total_size = 0
self.args_iterator = iter(self.args_iterator)
while True:
try:
args = next(self.args_iterator)
except StopIteration as e:
break
except Exception as e: # pylint: disable=broad-except
_IncrementFailureCount()
if self.fail_on_error:
self.iterator_exception = e
raise
else:
try:
self.exception_handler(self.cls, e)
except Exception as _: # pylint: disable=broad-except
self.cls.logger.debug(
'Caught exception while handling exception for %s:\n%s',
self.func, traceback.format_exc())
self.shared_variables_updater.Update(self.caller_id, self.cls)
continue
if self.arg_checker(self.cls, args):
num_tasks += 1
if self.status_queue:
if not num_tasks % 100:
# Time to update the total number of tasks.
if (isinstance(args, NameExpansionResult) or
isinstance(args, CopyObjectInfo) or
isinstance(args, RsyncDiffToApply)):
PutToQueueWithTimeout(
self.status_queue,
ProducerThreadMessage(num_tasks, total_size, time.time()))
if (isinstance(args, NameExpansionResult) or
isinstance(args, CopyObjectInfo)):
if args.expanded_result:
json_expanded_result = json.loads(args.expanded_result)
if 'size' in json_expanded_result:
total_size += int(json_expanded_result['size'])
elif isinstance(args, RsyncDiffToApply):
if args.copy_size:
total_size += int(args.copy_size)
if not seek_ahead_thread_considered:
if task_estimation_threshold is None:
task_estimation_threshold = _GetTaskEstimationThreshold()
if task_estimation_threshold <= 0:
# Disable the seek-ahead thread (never start it).
seek_ahead_thread_considered = True
elif num_tasks >= task_estimation_threshold:
if self.seek_ahead_iterator:
seek_ahead_thread_cancel_event = threading.Event()
seek_ahead_thread = _StartSeekAheadThread(
self.seek_ahead_iterator, seek_ahead_thread_cancel_event)
# For integration testing only, force estimation to complete
# prior to producing further results.
if boto.config.get('GSUtil', 'task_estimation_force', None):
seek_ahead_thread.join(timeout=SEEK_AHEAD_JOIN_TIMEOUT)
seek_ahead_thread_considered = True
last_task = cur_task
cur_task = Task(self.func, args, self.caller_id,
self.exception_handler, self.should_return_results,
self.arg_checker, self.fail_on_error)
if last_task:
self.task_queue.put(last_task)
except Exception as e: # pylint: disable=broad-except
# This will also catch any exception raised due to an error in the
# iterator when fail_on_error is set, so check that we failed for some
# other reason before claiming that we had an unknown exception.
if not self.iterator_exception:
self.unknown_exception = e
finally:
# We need to make sure to update total_tasks[caller_id] before we enqueue
# the last task. Otherwise, a worker can retrieve the last task and
# complete it, then check total_tasks and determine that we're not done
# producing all before we update total_tasks. This approach forces workers
# to wait on the last task until after we've updated total_tasks.
total_tasks[self.caller_id] = num_tasks
if not cur_task:
# This happens if there were zero arguments to be put in the queue.
cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, None,
None, None, None)
self.task_queue.put(cur_task)
# If the seek ahead thread is still running, cancel it and wait for it
# to exit since we've enumerated all of the tasks already. We don't want
# to delay command completion on an estimate that has become meaningless.
if seek_ahead_thread is not None:
seek_ahead_thread_cancel_event.set()
# It's possible that the seek-ahead-thread may attempt to put to the
# status queue after it has been torn down, for example if the system
# is overloaded. Because the put uses a timeout, it should never block
# command termination or signal handling.
seek_ahead_thread.join(timeout=SEEK_AHEAD_JOIN_TIMEOUT)
# Send a final ProducerThread message that definitively states
# the amount of actual work performed.
if (self.status_queue and
(isinstance(args, NameExpansionResult) or isinstance(
args, CopyObjectInfo) or isinstance(args, RsyncDiffToApply))):
PutToQueueWithTimeout(
self.status_queue,
ProducerThreadMessage(num_tasks,
total_size,
time.time(),
finished=True))
# It's possible that the workers finished before we updated total_tasks,
# so we need to check here as well.
_NotifyIfDone(self.caller_id,
caller_id_finished_count.get(self.caller_id))