perfkitbenchmarker/background_tasks.py (337 lines of code) (raw):
# Copyright 2016 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Background tasks that propagate PKB thread context.
TODO(user): Many of the threading module flaws have been corrected in
Python 3. When PKB switches to Python 3, this module can be simplified.
PKB tries its best to clean up provisioned resources upon SIGINT. By default,
Python raises a KeyboardInterrupt upon a SIGINT, but none of the built-in
threading module classes are designed to handle a KeyboardInterrupt very well:
- threading.Lock has an atomic acquire method that cannot be interrupted and
hangs forever if the same thread tries to acquire twice. Its release method
can be called by any thread but raises thread.error if an unacquired Lock is
released.
- More complicated classes (threading.RLock, threading.Event, threading.Thread,
Queue.Queue) use internal Locks in such a way that a KeyboardInterrupt can
cause a thread that has acquired a Lock to jump out of its current action
without releasing the Lock. For example, in the below code, a
KeyboardInterrupt can be raised immediately after the acquire call but before
entering the try block:
lock.acquire()
try:
...
except:
lock.release()
Taken together, this means that there is a possibility to leave an internal Lock
acquired, and when later cleanup steps on the same or different thread attempt
to acquire the Lock, they will hang forever, unresponsive to even a second
KeyboardInterrupt. A KeyboardInterrupt during Thread.start() or Thread.join()
can even trigger an unbalanced acquire on a global lock used to keep track of
active threads, so that later attempts to start or join any Thread will hang
forever.
While it would take a significant and impractical redesign of PKB's code to
completely eliminate any risk of deadlock following a KeyboardInterrupt, the
code in this module is designed to allow interrupting parallel tasks while
keeping the risk of deadlock low.
"""
import abc
import collections
from concurrent import futures
import ctypes
import functools
import logging
import os
import queue
import signal
import threading
import time
import traceback
from absl import flags
from perfkitbenchmarker import context
from perfkitbenchmarker import errors
from perfkitbenchmarker import log_util
# For situations where an interruptable wait is necessary, a loop of waits with
# long timeouts is used instead. This is because some of Python's built-in wait
# methods are non-interruptable without a timeout.
_LONG_TIMEOUT = 1000.0
# Constants used for polling waits. See _WaitForCondition.
_WAIT_MIN_RECHECK_DELAY = 0.001 # 1 ms
_WAIT_MAX_RECHECK_DELAY = 0.050 # 50 ms
# Values sent to child threads that have special meanings.
_THREAD_STOP_PROCESSING = 0
_THREAD_WAIT_FOR_KEYBOARD_INTERRUPT = 1
# The default value for max_concurrent_threads.
MAX_CONCURRENT_THREADS = 200
# The default value is set in pkb.py. It is the greater of
# MAX_CONCURRENT_THREADS or the value passed to --num_vms. This is particularly
# important for the cluster_boot benchmark where we want to launch all of the
# VMs in parallel.
flags.DEFINE_integer(
'max_concurrent_threads',
None,
'Maximum number of concurrent threads to use when running a benchmark.',
)
FLAGS = flags.FLAGS
def _GetCallString(target_arg_tuple):
"""Returns the string representation of a function call."""
target, args, kwargs = target_arg_tuple
while isinstance(target, functools.partial):
args = target.args + args
inner_kwargs = target.keywords.copy()
inner_kwargs.update(kwargs)
kwargs = inner_kwargs
target = target.func
arg_strings = [str(a) for a in args]
arg_strings.extend(['{}={}'.format(k, v) for k, v in kwargs.items()])
return '{}({})'.format(
getattr(target, '__name__', target), ', '.join(arg_strings)
)
def _WaitForCondition(condition_callback, timeout=None):
"""Waits until the specified callback returns a value that evaluates True.
Similar to the threading.Condition.wait method that is the basis of most
threading class wait routines. Polls the condition, starting with frequent
checks but extending the delay between checks upon each failure.
Args:
condition_callback: Callable that returns a value that evaluates True to end
the wait or evaluates False to continue the wait.
timeout: Optional float. Number of seconds to wait before giving up. If
provided, the condition is still checked at least once before giving up.
If not provided, the wait does not time out.
Returns:
True if condition_callback returned a value that evaluated True. False if
condition_callback did not return a value that evaluated True before the
timeout.
"""
deadline = None if timeout is None else time.time() + timeout
delay = _WAIT_MIN_RECHECK_DELAY
while True:
if condition_callback():
return True
remaining_time = (
_WAIT_MAX_RECHECK_DELAY if deadline is None else deadline - time.time()
)
if remaining_time <= 0:
return False
time.sleep(delay)
delay = min(delay * 2, remaining_time, _WAIT_MAX_RECHECK_DELAY)
class _SingleReaderQueue:
"""Queue to which multiple threads write but from which only one thread reads.
A lightweight substitute for the Queue.Queue class that does not use
internal Locks.
Gets are interruptable but depend on polling.
"""
def __init__(self):
self._deque = collections.deque()
def Get(self, timeout=None):
if not _WaitForCondition(lambda: self._deque, timeout):
raise queue.Empty
return self._deque.popleft()
def Put(self, item):
self._deque.append(item)
class _NonPollingSingleReaderQueue:
"""Queue to which multiple threads write but from which only one thread reads.
Uses a threading.Lock to implement a non-interruptable Get that does not poll
and is therefore easier on CPU usage. The reader waits for items by acquiring
the Lock, and writers release the Lock to signal that items have been written.
"""
def __init__(self):
self._deque = collections.deque()
self._lock = threading.Lock()
self._lock.acquire()
def _WaitForItem(self):
self._lock.acquire()
def _SignalAvailableItem(self):
try:
self._lock.release()
except threading.ThreadError:
pass
def Get(self):
while True:
self._WaitForItem()
if self._deque:
item = self._deque.popleft()
if self._deque:
self._SignalAvailableItem()
return item
def Put(self, item):
self._deque.append(item)
self._SignalAvailableItem()
class _BackgroundTaskThreadContext:
"""Thread-specific information that can be inherited by a background task.
Attributes:
benchmark_spec: BenchmarkSpec of the benchmark currently being executed.
log_context: ThreadLogContext of the parent thread.
"""
def __init__(self):
self.benchmark_spec = context.GetThreadBenchmarkSpec()
self.log_context = log_util.GetThreadLogContext()
def CopyToCurrentThread(self):
"""Sets the thread context of the current thread."""
log_util.SetThreadLogContext(log_util.ThreadLogContext(self.log_context))
context.SetThreadBenchmarkSpec(self.benchmark_spec)
class _BackgroundTask:
"""Base class for a task executed in a child thread or process.
Attributes:
target: Function that is invoked in the child thread or process.
args: Series of unnamed arguments to be passed to the target.
kwargs: dict. Keyword arguments to be passed to the target.
context: _BackgroundTaskThreadContext. Thread-specific state to be inherited
from parent to child thread.
return_value: Return value if the call was executed successfully, or None
otherwise.
traceback: The traceback string if the call raised an exception, or None
otherwise.
"""
def __init__(self, target, args, kwargs, thread_context):
self.target = target
self.args = args
self.kwargs = kwargs
self.context = thread_context
self.return_value = None
self.traceback = None
def Run(self):
"""Sets the current thread context and executes the target."""
self.context.CopyToCurrentThread()
try:
self.return_value = self.target(*self.args, **self.kwargs)
except Exception:
self.traceback = traceback.format_exc()
class _BackgroundTaskManager(metaclass=abc.ABCMeta):
"""Base class for a context manager that manages state for background tasks.
Attributes:
tasks: list of _BackgroundTask instances. Contains one _BackgroundTask per
started task, in the order that they were started.
"""
def __init__(self, max_concurrency):
self._max_concurrency = max_concurrency
self.tasks = []
def __enter__(self):
return self
def __exit__(self, *unused_args, **unused_kwargs):
pass
@abc.abstractmethod
def StartTask(self, target, args, kwargs, thread_context):
"""Creates and starts a _BackgroundTask.
The created task is appended to self.tasks.
Args:
target: Function that is invoked in the child thread or process.
args: Series of unnamed arguments to be passed to the target.
kwargs: dict. Keyword arguments to be passed to the target.
thread_context: _BackgroundTaskThreadContext. Thread-specific state to be
inherited from parent to child thread.
"""
raise NotImplementedError()
@abc.abstractmethod
def AwaitAnyTask(self):
"""Waits for any of the started tasks to complete.
Returns:
int. Index of the task that completed in self.tasks.
"""
raise NotImplementedError()
@abc.abstractmethod
def HandleKeyboardInterrupt(self):
"""Called by the parent thread if a KeyboardInterrupt occurs.
Ensures that any child thread also receives a KeyboardInterrupt, and then
waits for each child thread to stop executing.
"""
raise NotImplementedError()
def _ExecuteBackgroundThreadTasks(worker_id, task_queue, response_queue):
"""Executes tasks received on a task queue.
Executed in a child Thread by _BackgroundThreadTaskManager.
Args:
worker_id: int. Identifier for the child thread relative to other child
threads.
task_queue: _NonPollingSingleReaderQueue. Queue from which input is read.
Each value in the queue can be one of three types of values. If it is a
(task_id, _BackgroundTask) pair, the task is executed on this thread. If
it is _THREAD_STOP_PROCESSING, the thread stops executing. If it is
_THREAD_WAIT_FOR_KEYBOARD_INTERRUPT, the thread waits for a
KeyboardInterrupt.
response_queue: _SingleReaderQueue. Queue to which output is written. It
receives worker_id when this thread's bootstrap code has completed and
receives a (worker_id, task_id) pair for each task completed on this
thread.
"""
try:
response_queue.Put(worker_id)
while True:
task_tuple = task_queue.Get()
if task_tuple == _THREAD_STOP_PROCESSING:
break
elif task_tuple == _THREAD_WAIT_FOR_KEYBOARD_INTERRUPT:
while True:
time.sleep(_WAIT_MAX_RECHECK_DELAY)
task_id, task = task_tuple
task.Run()
response_queue.Put((worker_id, task_id))
except KeyboardInterrupt:
# TODO(user): Detect when the log would be unhelpful (e.g. if the
# current thread was spinning in the _THREAD_WAIT_FOR_KEYBOARD_INTERRUPT
# sub-loop). Only log in helpful cases, like when the task is interrupted.
logging.debug(
'Child thread %s received a KeyboardInterrupt from its parent.',
worker_id,
exc_info=True,
)
class _BackgroundThreadTaskManager(_BackgroundTaskManager):
"""Manages state for background tasks started in child threads."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._response_queue = _SingleReaderQueue()
self._task_queues = []
self._threads = []
self._available_worker_ids = list(range(self._max_concurrency))
uninitialized_worker_ids = set(self._available_worker_ids)
for worker_id in self._available_worker_ids:
task_queue = _NonPollingSingleReaderQueue()
self._task_queues.append(task_queue)
thread = threading.Thread(
target=_ExecuteBackgroundThreadTasks,
args=(worker_id, task_queue, self._response_queue),
)
thread.daemon = True
self._threads.append(thread)
thread.start()
# Wait for each Thread to finish its bootstrap code. Starting all the
# threads upfront like this and reusing them for later calls minimizes the
# risk of a KeyboardInterrupt interfering with any of the Lock interactions.
for _ in self._threads:
worker_id = self._response_queue.Get()
uninitialized_worker_ids.remove(worker_id)
assert not uninitialized_worker_ids, uninitialized_worker_ids
def __exit__(self, *unused_args, **unused_kwargs):
# Shut down worker threads.
for task_queue in self._task_queues:
task_queue.Put(_THREAD_STOP_PROCESSING)
for thread in self._threads:
_WaitForCondition(lambda: not thread.is_alive())
def StartTask(self, target, args, kwargs, thread_context):
assert (
self._available_worker_ids
), 'StartTask called when no threads were available'
task = _BackgroundTask(target, args, kwargs, thread_context)
task_id = len(self.tasks)
self.tasks.append(task)
worker_id = self._available_worker_ids.pop()
self._task_queues[worker_id].Put((task_id, task))
def AwaitAnyTask(self):
worker_id, task_id = self._response_queue.Get()
self._available_worker_ids.append(worker_id)
return task_id
def HandleKeyboardInterrupt(self):
# Raise a KeyboardInterrupt in each child thread.
for thread in self._threads:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(KeyboardInterrupt)
)
# Wake threads up from possible non-interruptable wait states so they can
# actually see the KeyboardInterrupt.
for task_queue, thread in zip(self._task_queues, self._threads):
task_queue.Put(_THREAD_WAIT_FOR_KEYBOARD_INTERRUPT)
for thread in self._threads:
_WaitForCondition(lambda: not thread.is_alive())
def _ExecuteProcessTask(task):
"""Function invoked in another process by _BackgroundProcessTaskManager.
Executes a specified task function and returns the result or exception
traceback.
TODO(user): Rework this helper function when moving to Python 3.5 or
when the backport of concurrent.futures.ProcessPoolExecutor is able to
preserve original traceback.
Args:
task: _BackgroundTask to execute.
Returns:
(result, traceback) tuple. The first element is the return value from the
task function, or None if the function raised an exception. The second
element is the exception traceback string, or None if the function
succeeded.
"""
def handle_sigint(signum, frame):
# Ignore any new SIGINTs since we are already tearing down.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Execute the default SIGINT handler which throws a KeyboardInterrupt
# in the main thread of the process.
signal.default_int_handler(signum, frame)
signal.signal(signal.SIGINT, handle_sigint)
task.Run()
return task.return_value, task.traceback
class _BackgroundProcessTaskManager(_BackgroundTaskManager):
"""Manages states for background tasks started in child processes.
TODO(user): This class uses futures.ProcessPoolExecutor. We have been
using this executor since before issues regarding KeyboardInterrupt were
fully explored. The only consumer of this class is RunParallelProcesses, and
currently the uses for RunParallelProcesses are limited. In the future, this
class should also be redesigned for protection against KeyboardInterrupt.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._active_futures = {}
self._executor = futures.ProcessPoolExecutor(self._max_concurrency)
def __enter__(self):
self._executor.__enter__()
return self
def __exit__(self, *args, **kwargs):
# Note: This invokes a non-interruptable wait.
return self._executor.__exit__(*args, **kwargs)
def StartTask(self, target, args, kwargs, thread_context):
task = _BackgroundTask(target, args, kwargs, thread_context)
task_id = len(self.tasks)
self.tasks.append(task)
future = self._executor.submit(_ExecuteProcessTask, task)
self._active_futures[future] = task_id
def AwaitAnyTask(self):
completed_tasks = None
while not completed_tasks:
completed_tasks, _ = futures.wait(
self._active_futures,
timeout=_LONG_TIMEOUT,
return_when=futures.FIRST_COMPLETED,
)
future = completed_tasks.pop()
task_id = self._active_futures.pop(future)
task = self.tasks[task_id]
task.return_value, task.traceback = future.result()
return task_id
def HandleKeyboardInterrupt(self):
# If this thread received an interrupt signal, then processes started with
# a ProcessPoolExecutor will also have received an interrupt without any
# extra work needed from this class. Only need to wait for child processes.
# Note: This invokes a non-interruptable wait.
self._executor.shutdown(wait=True)
def _RunParallelTasks(
target_arg_tuples,
max_concurrency,
get_task_manager,
parallel_exception_class,
post_task_delay=0,
):
"""Executes function calls concurrently in separate threads or processes.
Args:
target_arg_tuples: list of (target, args, kwargs) tuples. Each tuple
contains the function to call and the arguments to pass it.
max_concurrency: int or None. The maximum number of concurrent new threads
or processes.
get_task_manager: Callable that accepts an int max_concurrency arg and
returns a _TaskManager.
parallel_exception_class: Type of exception to raise upon an exception in
one of the called functions.
post_task_delay: Delay in seconds between parallel task invocations.
Returns:
list of function return values in the order corresponding to the order of
target_arg_tuples.
Raises:
parallel_exception_class: When an exception occurred in any of the called
functions.
"""
thread_context = _BackgroundTaskThreadContext()
max_concurrency = min(max_concurrency, len(target_arg_tuples))
error_strings = []
started_task_count = 0
active_task_count = 0
with get_task_manager(max_concurrency) as task_manager:
try:
while started_task_count < len(target_arg_tuples) or active_task_count:
if (
started_task_count < len(target_arg_tuples)
and active_task_count < max_concurrency
):
# Start a new task.
target, args, kwargs = target_arg_tuples[started_task_count]
task_manager.StartTask(target, args, kwargs, thread_context)
started_task_count += 1
active_task_count += 1
if post_task_delay:
time.sleep(post_task_delay)
continue
# Wait for a task to complete.
task_id = task_manager.AwaitAnyTask()
active_task_count -= 1
# If the task failed, it may still be a long time until all remaining
# tasks complete. Log the failure immediately before continuing to wait
# for other tasks.
stacktrace = task_manager.tasks[task_id].traceback
if stacktrace:
msg = 'Exception occurred while calling {}:{}{}'.format(
_GetCallString(target_arg_tuples[task_id]), os.linesep, stacktrace
)
logging.error(msg)
error_strings.append(msg)
except KeyboardInterrupt:
logging.error(
'Received KeyboardInterrupt while executing parallel tasks. Waiting '
'for %s tasks to clean up.',
active_task_count,
)
task_manager.HandleKeyboardInterrupt()
raise
if error_strings:
# TODO(user): Combine errors.VmUtil.ThreadException and
# errors.VmUtil.CalledProcessException so this can be a single exception
# type.
raise parallel_exception_class(
'The following exceptions occurred during parallel execution:'
'{}{}'.format(os.linesep, os.linesep.join(error_strings))
)
results = [task.return_value for task in task_manager.tasks]
assert len(target_arg_tuples) == len(results), (target_arg_tuples, results)
return results
def RunParallelThreads(target_arg_tuples, max_concurrency, post_task_delay=0):
"""Executes function calls concurrently in separate threads.
Args:
target_arg_tuples: list of (target, args, kwargs) tuples. Each tuple
contains the function to call and the arguments to pass it.
max_concurrency: int or None. The maximum number of concurrent new threads.
post_task_delay: Delay in seconds between parallel task invocations.
Returns:
list of function return values in the order corresponding to the order of
target_arg_tuples.
Raises:
errors.VmUtil.ThreadException: When an exception occurred in any of the
called functions.
"""
return _RunParallelTasks(
target_arg_tuples,
max_concurrency,
_BackgroundThreadTaskManager,
errors.VmUtil.ThreadException,
post_task_delay,
)
def RunThreaded(
target, thread_params, max_concurrent_threads=None, post_task_delay=0
):
"""Runs the target method in parallel threads.
The method starts up threads with one arg from thread_params as the first arg.
Args:
target: The method to invoke in the thread.
thread_params: A thread is launched for each value in the list. The items in
the list can either be a singleton or a (args, kwargs) tuple/list. Usually
this is a list of VMs.
max_concurrent_threads: The maximum number of concurrent threads to allow.
post_task_delay: Delay in seconds between commands.
Returns:
List of the same length as thread_params. Contains the return value from
each threaded function call in the corresponding order as thread_params.
Raises:
ValueError: when thread_params is not valid.
errors.VmUtil.ThreadException: When an exception occurred in any of the
called functions.
Example 1: # no args other than list.
args = [self.CreateVm()
for x in range(0, 10)]
RunThreaded(MyThreadedTargetMethod, args)
Example 2: # using args only to pass to the thread:
args = [((self.CreateVm(), i, 'somestring'), {})
for i in range(0, 10)]
RunThreaded(MyThreadedTargetMethod, args)
Example 3: # using args & kwargs to pass to the thread:
args = [((self.CreateVm(),), {'num': i, 'name': 'somestring'})
for i in range(0, 10)]
RunThreaded(MyThreadedTargetMethod, args)
"""
if max_concurrent_threads is None:
max_concurrent_threads = (
FLAGS.max_concurrent_threads or MAX_CONCURRENT_THREADS
)
if not isinstance(thread_params, list):
raise ValueError('Param "thread_params" must be a list')
if not thread_params:
# Nothing to do.
return []
if not isinstance(thread_params[0], tuple):
target_arg_tuples = [(target, (arg,), {}) for arg in thread_params]
elif not isinstance(thread_params[0][0], tuple) or not isinstance(
thread_params[0][1], dict
):
raise ValueError('If Param is a tuple, the tuple must be (tuple, dict)')
else:
target_arg_tuples = [
(target, args, kwargs) for args, kwargs in thread_params
]
return RunParallelThreads(
target_arg_tuples,
max_concurrency=max_concurrent_threads,
post_task_delay=post_task_delay,
)
def RunParallelProcesses(
target_arg_tuples, max_concurrency, post_process_delay=0
):
"""Executes function calls concurrently in separate processes.
Args:
target_arg_tuples: list of (target, args, kwargs) tuples. Each tuple
contains the function to call and the arguments to pass it.
max_concurrency: int or None. The maximum number of concurrent new
processes. If None, it will default to the number of processors on the
machine.
post_process_delay: Delay in seconds between parallel process invocations.
Returns:
list of function return values in the order corresponding to the order of
target_arg_tuples.
Raises:
errors.VmUtil.CalledProcessException: When an exception occurred in any
of the called functions.
"""
def handle_sigint(signum, frame):
# Ignore any SIGINTS in the parent process, but let users know
# that the child processes are getting cleaned up.
logging.error(
'Got SIGINT while executing parallel tasks. '
'Waiting for tasks to clean up.'
)
old_handler = None
try:
old_handler = signal.signal(signal.SIGINT, handle_sigint)
ret_val = _RunParallelTasks(
target_arg_tuples,
max_concurrency,
_BackgroundProcessTaskManager,
errors.VmUtil.CalledProcessException,
post_task_delay=post_process_delay,
)
finally:
if old_handler:
signal.signal(signal.SIGINT, old_handler)
return ret_val