in gslib/command.py [0:0]
def _ParallelApply(self,
func,
args_iterator,
exception_handler,
caller_id,
arg_checker,
process_count,
thread_count,
should_return_results,
fail_on_error,
seek_ahead_iterator=None,
parallel_operations_override=None):
r"""Dispatches input arguments across a thread/process pool.
Pools are composed of parallel OS processes and/or Python threads,
based on options (-m or not) and settings in the user's config file.
If only one OS process is requested/available, dispatch requests across
threads in the current OS process.
In the multi-process case, we will create one pool of worker processes for
each level of the tree of recursive calls to Apply. E.g., if A calls
Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
will only create two sets of worker processes - B will execute in the first,
and C and D will execute in the second. If C is then changed to call
Apply(E) and D is changed to call Apply(F), then we will automatically
create a third set of processes (lazily, when needed) that will be used to
execute calls to E and F. This might look something like:
Pool1 Executes: B
/ \
Pool2 Executes: C D
/ \
Pool3 Executes: E F
Apply's parallelism is generally broken up into 4 cases:
- If process_count == thread_count == 1, then all tasks will be executed
by _SequentialApply.
- If process_count > 1 and thread_count == 1, then the main thread will
create a new pool of processes (if they don't already exist) and each of
those processes will execute the tasks in a single thread.
- If process_count == 1 and thread_count > 1, then this process will create
a new pool of threads to execute the tasks.
- If process_count > 1 and thread_count > 1, then the main thread will
create a new pool of processes (if they don't already exist) and each of
those processes will, upon creation, create a pool of threads to
execute the tasks.
Args:
caller_id: The caller ID unique to this call to command.Apply.
See command.Apply for description of other arguments.
"""
# This is initialized in Initialize(Multiprocessing|Threading)Variables
# pylint: disable=global-variable-not-assigned
# pylint: disable=global-variable-undefined
global glob_status_queue, ui_controller
# pylint: enable=global-variable-not-assigned
# pylint: enable=global-variable-undefined
is_main_thread = self.recursive_apply_level == 0
if (parallel_operations_override == self.ParallelOverrideReason.SLICE and
self.recursive_apply_level <= 1):
# The operation uses slice parallelism if the recursive apply level > 0 or
# if we're executing _ParallelApply without the -m option.
glob_status_queue.put(PerformanceSummaryMessage(time.time(), True))
if not IS_WINDOWS and is_main_thread:
# For multi-thread or multi-process scenarios, the main process must
# kill itself on a terminating signal, because sys.exit(1) only exits
# the currently executing thread, leaving orphaned processes. The main
# thread is responsible for cleaning up multiprocessing variables such
# as manager processes. Therefore, the main thread's signal handling
# chain is:
# 1: __main__._CleanupSignalHandler (clean up processes)
# 2: MultithreadedSignalHandler (kill self)
for signal_num in (signal.SIGINT, signal.SIGTERM):
RegisterSignalHandler(signal_num,
MultithreadedMainSignalHandler,
is_final_handler=True)
if not task_queues:
# The process we create will need to access the next recursive level
# of task queues if it makes a call to Apply, so we always keep around
# one more queue than we know we need. OTOH, if we don't create a new
# process, the existing process still needs a task queue to use.
if process_count > 1:
task_queues.append(_NewMultiprocessingQueue())
else:
task_queue = _NewThreadsafeQueue()
task_queues.append(task_queue)
# Create a top-level worker pool since this is the first execution
# of ParallelApply on the main thread.
WorkerPool(thread_count,
self.logger,
task_queue=task_queue,
bucket_storage_uri_class=self.bucket_storage_uri_class,
gsutil_api_map=self.gsutil_api_map,
debug=self.debug,
status_queue=glob_status_queue,
headers=self.non_metadata_headers,
perf_trace_token=self.perf_trace_token,
trace_token=self.trace_token,
user_project=self.user_project)
if process_count > 1: # Handle process pool creation.
# Check whether this call will need a new set of workers.
# Each worker must acquire a shared lock before notifying the main thread
# that it needs a new worker pool, so that at most one worker asks for
# a new worker pool at once.
try:
if not is_main_thread:
worker_checking_level_lock.acquire()
if self.recursive_apply_level >= current_max_recursive_level.GetValue():
with need_pool_or_done_cond:
# Only the main thread is allowed to create new processes -
# otherwise, we will run into some Python bugs.
if is_main_thread:
self._CreateNewConsumerPool(process_count, thread_count,
glob_status_queue)
else:
# Notify the main thread that we need a new consumer pool.
new_pool_needed.Reset(reset_value=1)
need_pool_or_done_cond.notify_all()
# The main thread will notify us when it finishes.
need_pool_or_done_cond.wait()
finally:
if not is_main_thread:
worker_checking_level_lock.release()
else: # Handle new worker thread pool creation.
if not is_main_thread:
try:
worker_checking_level_lock.acquire()
if self.recursive_apply_level > _GetCurrentMaxRecursiveLevel():
# We don't have a thread pool for this level of recursive apply
# calls, so create a pool and corresponding task queue.
_IncrementCurrentMaxRecursiveLevel()
task_queue = _NewThreadsafeQueue()
task_queues.append(task_queue)
WorkerPool(thread_count,
self.logger,
task_queue=task_queue,
bucket_storage_uri_class=self.bucket_storage_uri_class,
gsutil_api_map=self.gsutil_api_map,
debug=self.debug,
status_queue=glob_status_queue,
headers=self.non_metadata_headers,
perf_trace_token=self.perf_trace_token,
trace_token=self.trace_token,
user_project=self.user_project)
finally:
worker_checking_level_lock.release()
task_queue = task_queues[self.recursive_apply_level]
# Only use the seek-ahead iterator in the main thread to provide an
# overall estimate of operations.
if seek_ahead_iterator and not is_main_thread:
seek_ahead_iterator = None
# Kick off a producer thread to throw tasks in the global task queue. We
# do this asynchronously so that the main thread can be free to create new
# consumer pools when needed (otherwise, any thread with a task that needs
# a new consumer pool must block until we're completely done producing; in
# the worst case, every worker blocks on such a call and the producer fills
# up the task queue before it finishes, so we block forever).
producer_thread = ProducerThread(
copy.copy(self),
args_iterator,
caller_id,
func,
task_queue,
should_return_results,
exception_handler,
arg_checker,
fail_on_error,
seek_ahead_iterator=seek_ahead_iterator,
status_queue=(glob_status_queue if is_main_thread else None))
# Start the UI thread that is responsible for displaying operation status
# (aggregated across processes and threads) to the user.
ui_thread = None
if is_main_thread:
ui_thread = UIThread(glob_status_queue, sys.stderr, ui_controller)
# Wait here until either:
# 1. We're the main thread in the multi-process case, and someone needs
# a new consumer pool - in which case we create one and continue
# waiting.
# 2. Someone notifies us that all of the work we requested is done, in
# which case we retrieve the results (if applicable) and stop
# waiting.
# At most one of these can be true, because the main thread is blocked on
# its call to Apply, and a thread will not ask for a new consumer pool
# unless it had more work to do.
while True:
with need_pool_or_done_cond:
if call_completed_map[caller_id]:
break
elif (process_count > 1 and is_main_thread and
new_pool_needed.GetValue()):
new_pool_needed.Reset()
self._CreateNewConsumerPool(process_count, thread_count,
glob_status_queue)
need_pool_or_done_cond.notify_all()
# Note that we must check the above conditions before the wait() call;
# otherwise, the notification can happen before we start waiting, in
# which case we'll block forever.
need_pool_or_done_cond.wait()
# We've completed all tasks (or excepted), so signal the UI thread to
# terminate.
if is_main_thread:
PutToQueueWithTimeout(glob_status_queue, ZERO_TASKS_TO_DO_ARGUMENT)
ui_thread.join(timeout=UI_THREAD_JOIN_TIMEOUT)
# Now that all the work is done, log the types of source URLs encountered.
self._ProcessSourceUrlTypes(producer_thread.args_iterator)
# We encountered an exception from the producer thread before any arguments
# were enqueued, but it wouldn't have been propagated, so we'll now
# explicitly raise it here.
if producer_thread.unknown_exception:
# pylint: disable=raising-bad-type
raise producer_thread.unknown_exception
# We encountered an exception from the producer thread while iterating over
# the arguments, so raise it here if we're meant to fail on error.
if producer_thread.iterator_exception and fail_on_error:
# pylint: disable=raising-bad-type
raise producer_thread.iterator_exception
if is_main_thread and not parallel_operations_override:
PutToQueueWithTimeout(glob_status_queue, FinalMessage(time.time()))