def run()

in gslib/command.py [0:0]


  def run(self):
    num_tasks = 0
    cur_task = None
    last_task = None
    task_estimation_threshold = None
    seek_ahead_thread = None
    seek_ahead_thread_cancel_event = None
    seek_ahead_thread_considered = False
    args = None
    try:
      total_size = 0
      self.args_iterator = iter(self.args_iterator)
      while True:
        try:
          args = next(self.args_iterator)
        except StopIteration as e:
          break
        except Exception as e:  # pylint: disable=broad-except
          _IncrementFailureCount()
          if self.fail_on_error:
            self.iterator_exception = e
            raise
          else:
            try:
              self.exception_handler(self.cls, e)
            except Exception as _:  # pylint: disable=broad-except
              self.cls.logger.debug(
                  'Caught exception while handling exception for %s:\n%s',
                  self.func, traceback.format_exc())
            self.shared_variables_updater.Update(self.caller_id, self.cls)
            continue

        if self.arg_checker(self.cls, args):
          num_tasks += 1
          if self.status_queue:
            if not num_tasks % 100:
              # Time to update the total number of tasks.
              if (isinstance(args, NameExpansionResult) or
                  isinstance(args, CopyObjectInfo) or
                  isinstance(args, RsyncDiffToApply)):
                PutToQueueWithTimeout(
                    self.status_queue,
                    ProducerThreadMessage(num_tasks, total_size, time.time()))
            if (isinstance(args, NameExpansionResult) or
                isinstance(args, CopyObjectInfo)):
              if args.expanded_result:
                json_expanded_result = json.loads(args.expanded_result)
                if 'size' in json_expanded_result:
                  total_size += int(json_expanded_result['size'])
            elif isinstance(args, RsyncDiffToApply):
              if args.copy_size:
                total_size += int(args.copy_size)

          if not seek_ahead_thread_considered:
            if task_estimation_threshold is None:
              task_estimation_threshold = _GetTaskEstimationThreshold()
            if task_estimation_threshold <= 0:
              # Disable the seek-ahead thread (never start it).
              seek_ahead_thread_considered = True
            elif num_tasks >= task_estimation_threshold:
              if self.seek_ahead_iterator:
                seek_ahead_thread_cancel_event = threading.Event()
                seek_ahead_thread = _StartSeekAheadThread(
                    self.seek_ahead_iterator, seek_ahead_thread_cancel_event)
                # For integration testing only, force estimation to complete
                # prior to producing further results.
                if boto.config.get('GSUtil', 'task_estimation_force', None):
                  seek_ahead_thread.join(timeout=SEEK_AHEAD_JOIN_TIMEOUT)

              seek_ahead_thread_considered = True

          last_task = cur_task
          cur_task = Task(self.func, args, self.caller_id,
                          self.exception_handler, self.should_return_results,
                          self.arg_checker, self.fail_on_error)
          if last_task:
            self.task_queue.put(last_task)
    except Exception as e:  # pylint: disable=broad-except
      # This will also catch any exception raised due to an error in the
      # iterator when fail_on_error is set, so check that we failed for some
      # other reason before claiming that we had an unknown exception.
      if not self.iterator_exception:
        self.unknown_exception = e
    finally:
      # We need to make sure to update total_tasks[caller_id] before we enqueue
      # the last task. Otherwise, a worker can retrieve the last task and
      # complete it, then check total_tasks and determine that we're not done
      # producing all before we update total_tasks. This approach forces workers
      # to wait on the last task until after we've updated total_tasks.
      total_tasks[self.caller_id] = num_tasks
      if not cur_task:
        # This happens if there were zero arguments to be put in the queue.
        cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, None,
                        None, None, None)
      self.task_queue.put(cur_task)

      # If the seek ahead thread is still running, cancel it and wait for it
      # to exit since we've enumerated all of the tasks already. We don't want
      # to delay command completion on an estimate that has become meaningless.
      if seek_ahead_thread is not None:
        seek_ahead_thread_cancel_event.set()
        # It's possible that the seek-ahead-thread may attempt to put to the
        # status queue after it has been torn down, for example if the system
        # is overloaded. Because the put uses a timeout, it should never block
        # command termination or signal handling.
        seek_ahead_thread.join(timeout=SEEK_AHEAD_JOIN_TIMEOUT)
      # Send a final ProducerThread message that definitively states
      # the amount of actual work performed.
      if (self.status_queue and
          (isinstance(args, NameExpansionResult) or isinstance(
              args, CopyObjectInfo) or isinstance(args, RsyncDiffToApply))):
        PutToQueueWithTimeout(
            self.status_queue,
            ProducerThreadMessage(num_tasks,
                                  total_size,
                                  time.time(),
                                  finished=True))

      # It's possible that the workers finished before we updated total_tasks,
      # so we need to check here as well.
      _NotifyIfDone(self.caller_id,
                    caller_id_finished_count.get(self.caller_id))