def _ParallelApply()

in gslib/command.py [0:0]


  def _ParallelApply(self,
                     func,
                     args_iterator,
                     exception_handler,
                     caller_id,
                     arg_checker,
                     process_count,
                     thread_count,
                     should_return_results,
                     fail_on_error,
                     seek_ahead_iterator=None,
                     parallel_operations_override=None):
    r"""Dispatches input arguments across a thread/process pool.

    Pools are composed of parallel OS processes and/or Python threads,
    based on options (-m or not) and settings in the user's config file.

    If only one OS process is requested/available, dispatch requests across
    threads in the current OS process.

    In the multi-process case, we will create one pool of worker processes for
    each level of the tree of recursive calls to Apply. E.g., if A calls
    Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
    will only create two sets of worker processes - B will execute in the first,
    and C and D will execute in the second. If C is then changed to call
    Apply(E) and D is changed to call Apply(F), then we will automatically
    create a third set of processes (lazily, when needed) that will be used to
    execute calls to E and F. This might look something like:

    Pool1 Executes:                B
                                  / \
    Pool2 Executes:              C   D
                                /     \
    Pool3 Executes:            E       F

    Apply's parallelism is generally broken up into 4 cases:
    - If process_count == thread_count == 1, then all tasks will be executed
      by _SequentialApply.
    - If process_count > 1 and thread_count == 1, then the main thread will
      create a new pool of processes (if they don't already exist) and each of
      those processes will execute the tasks in a single thread.
    - If process_count == 1 and thread_count > 1, then this process will create
      a new pool of threads to execute the tasks.
    - If process_count > 1 and thread_count > 1, then the main thread will
      create a new pool of processes (if they don't already exist) and each of
      those processes will, upon creation, create a pool of threads to
      execute the tasks.

    Args:
      caller_id: The caller ID unique to this call to command.Apply.
      See command.Apply for description of other arguments.
    """
    # This is initialized in Initialize(Multiprocessing|Threading)Variables
    # pylint: disable=global-variable-not-assigned
    # pylint: disable=global-variable-undefined
    global glob_status_queue, ui_controller
    # pylint: enable=global-variable-not-assigned
    # pylint: enable=global-variable-undefined
    is_main_thread = self.recursive_apply_level == 0

    if (parallel_operations_override == self.ParallelOverrideReason.SLICE and
        self.recursive_apply_level <= 1):
      # The operation uses slice parallelism if the recursive apply level > 0 or
      # if we're executing _ParallelApply without the -m option.
      glob_status_queue.put(PerformanceSummaryMessage(time.time(), True))

    if not IS_WINDOWS and is_main_thread:
      # For multi-thread or multi-process scenarios, the main process must
      # kill itself on a terminating signal, because sys.exit(1) only exits
      # the currently executing thread, leaving orphaned processes. The main
      # thread is responsible for cleaning up multiprocessing variables such
      # as manager processes. Therefore, the main thread's signal handling
      # chain is:
      # 1: __main__._CleanupSignalHandler (clean up processes)
      # 2: MultithreadedSignalHandler (kill self)
      for signal_num in (signal.SIGINT, signal.SIGTERM):
        RegisterSignalHandler(signal_num,
                              MultithreadedMainSignalHandler,
                              is_final_handler=True)

    if not task_queues:
      # The process we create will need to access the next recursive level
      # of task queues if it makes a call to Apply, so we always keep around
      # one more queue than we know we need. OTOH, if we don't create a new
      # process, the existing process still needs a task queue to use.
      if process_count > 1:
        task_queues.append(_NewMultiprocessingQueue())
      else:
        task_queue = _NewThreadsafeQueue()
        task_queues.append(task_queue)
        # Create a top-level worker pool since this is the first execution
        # of ParallelApply on the main thread.
        WorkerPool(thread_count,
                   self.logger,
                   task_queue=task_queue,
                   bucket_storage_uri_class=self.bucket_storage_uri_class,
                   gsutil_api_map=self.gsutil_api_map,
                   debug=self.debug,
                   status_queue=glob_status_queue,
                   headers=self.non_metadata_headers,
                   perf_trace_token=self.perf_trace_token,
                   trace_token=self.trace_token,
                   user_project=self.user_project)

    if process_count > 1:  # Handle process pool creation.
      # Check whether this call will need a new set of workers.

      # Each worker must acquire a shared lock before notifying the main thread
      # that it needs a new worker pool, so that at most one worker asks for
      # a new worker pool at once.
      try:
        if not is_main_thread:
          worker_checking_level_lock.acquire()
        if self.recursive_apply_level >= current_max_recursive_level.GetValue():
          with need_pool_or_done_cond:
            # Only the main thread is allowed to create new processes -
            # otherwise, we will run into some Python bugs.
            if is_main_thread:
              self._CreateNewConsumerPool(process_count, thread_count,
                                          glob_status_queue)
            else:
              # Notify the main thread that we need a new consumer pool.
              new_pool_needed.Reset(reset_value=1)
              need_pool_or_done_cond.notify_all()
              # The main thread will notify us when it finishes.
              need_pool_or_done_cond.wait()
      finally:
        if not is_main_thread:
          worker_checking_level_lock.release()
    else:  # Handle new worker thread pool creation.
      if not is_main_thread:
        try:
          worker_checking_level_lock.acquire()
          if self.recursive_apply_level > _GetCurrentMaxRecursiveLevel():
            # We don't have a thread pool for this level of recursive apply
            # calls, so create a pool and corresponding task queue.
            _IncrementCurrentMaxRecursiveLevel()
            task_queue = _NewThreadsafeQueue()
            task_queues.append(task_queue)
            WorkerPool(thread_count,
                       self.logger,
                       task_queue=task_queue,
                       bucket_storage_uri_class=self.bucket_storage_uri_class,
                       gsutil_api_map=self.gsutil_api_map,
                       debug=self.debug,
                       status_queue=glob_status_queue,
                       headers=self.non_metadata_headers,
                       perf_trace_token=self.perf_trace_token,
                       trace_token=self.trace_token,
                       user_project=self.user_project)
        finally:
          worker_checking_level_lock.release()

    task_queue = task_queues[self.recursive_apply_level]

    # Only use the seek-ahead iterator in the main thread to provide an
    # overall estimate of operations.
    if seek_ahead_iterator and not is_main_thread:
      seek_ahead_iterator = None

    # Kick off a producer thread to throw tasks in the global task queue. We
    # do this asynchronously so that the main thread can be free to create new
    # consumer pools when needed (otherwise, any thread with a task that needs
    # a new consumer pool must block until we're completely done producing; in
    # the worst case, every worker blocks on such a call and the producer fills
    # up the task queue before it finishes, so we block forever).
    producer_thread = ProducerThread(
        copy.copy(self),
        args_iterator,
        caller_id,
        func,
        task_queue,
        should_return_results,
        exception_handler,
        arg_checker,
        fail_on_error,
        seek_ahead_iterator=seek_ahead_iterator,
        status_queue=(glob_status_queue if is_main_thread else None))

    # Start the UI thread that is responsible for displaying operation status
    # (aggregated across processes and threads) to the user.
    ui_thread = None
    if is_main_thread:
      ui_thread = UIThread(glob_status_queue, sys.stderr, ui_controller)

    # Wait here until either:
    #   1. We're the main thread in the multi-process case, and someone needs
    #      a new consumer pool - in which case we create one and continue
    #      waiting.
    #   2. Someone notifies us that all of the work we requested is done, in
    #      which case we retrieve the results (if applicable) and stop
    #      waiting.
    # At most one of these can be true, because the main thread is blocked on
    # its call to Apply, and a thread will not ask for a new consumer pool
    # unless it had more work to do.
    while True:
      with need_pool_or_done_cond:
        if call_completed_map[caller_id]:
          break
        elif (process_count > 1 and is_main_thread and
              new_pool_needed.GetValue()):
          new_pool_needed.Reset()
          self._CreateNewConsumerPool(process_count, thread_count,
                                      glob_status_queue)
          need_pool_or_done_cond.notify_all()

        # Note that we must check the above conditions before the wait() call;
        # otherwise, the notification can happen before we start waiting, in
        # which case we'll block forever.
        need_pool_or_done_cond.wait()

    # We've completed all tasks (or excepted), so signal the UI thread to
    # terminate.
    if is_main_thread:
      PutToQueueWithTimeout(glob_status_queue, ZERO_TASKS_TO_DO_ARGUMENT)
      ui_thread.join(timeout=UI_THREAD_JOIN_TIMEOUT)
      # Now that all the work is done, log the types of source URLs encountered.
      self._ProcessSourceUrlTypes(producer_thread.args_iterator)

    # We encountered an exception from the producer thread before any arguments
    # were enqueued, but it wouldn't have been propagated, so we'll now
    # explicitly raise it here.
    if producer_thread.unknown_exception:
      # pylint: disable=raising-bad-type
      raise producer_thread.unknown_exception

    # We encountered an exception from the producer thread while iterating over
    # the arguments, so raise it here if we're meant to fail on error.
    if producer_thread.iterator_exception and fail_on_error:
      # pylint: disable=raising-bad-type
      raise producer_thread.iterator_exception
    if is_main_thread and not parallel_operations_override:
      PutToQueueWithTimeout(glob_status_queue, FinalMessage(time.time()))