in petastorm/workers_pool/process_pool.py [0:0]
def _worker_bootstrap(worker_class, worker_id, control_socket, worker_receiver_socket, results_sender_socket,
main_process_pid, serializer, worker_args):
"""This is the root of the spawned worker processes.
:param worker_class: A class with worker implementation.
:param worker_id: An integer. Unique for each worker.
:param control_socket: zmq socket used to control the worker (currently supports only :class:`zmq.FINISHED` signal)
:param worker_receiver_socket: A zmq socket used to deliver tasks to the worker
:param results_sender_socket: A zmq socket used to deliver the work products to the consumer
:param serializer: A serializer object (with serialize/deserialize methods) or None.
:param worker_args: Application specific parameter passed to worker constructor
:return: ``None``
"""
logger.debug('Starting _worker_bootstrap')
context = zmq.Context()
logger.debug('Connecting sockets')
# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.linger = 0
work_receiver.connect(worker_receiver_socket)
# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.linger = 0
results_sender.connect(results_sender_socket)
# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.linger = 0
control_receiver.connect(control_socket)
_setsockopt(control_receiver, zmq.SUBSCRIBE, b"")
logger.debug('Setting up poller')
# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)
results_sender.send_pyobj(_WORKER_STARTED_INDICATOR)
# Use this 'none_marker' as the first argument to send_multipart.
none_marker = bytes()
logger.debug('Instantiating a worker')
# Instantiate a worker
worker = worker_class(worker_id, lambda data: _serialize_result_and_send(results_sender, serializer, data),
worker_args)
logger.debug('Starting monitor loop')
thread = Thread(target=_monitor_thread_function, args=(main_process_pid,))
thread.daemon = True
thread.start()
# Loop and accept messages from both channels, acting accordingly
logger.debug('Entering worker loop')
while True:
logger.debug('Polling new message')
socks = dict(poller.poll())
# If the message came from work_receiver channel
if socks.get(work_receiver) == zmq.POLLIN:
try:
args, kargs = work_receiver.recv_pyobj()
logger.debug('Starting worker.process')
worker.process(*args, **kargs)
logger.debug('Finished worker.process')
results_sender.send_multipart([none_marker, pickle.dumps(VentilatedItemProcessedMessage())])
logger.debug('Sending result')
except Exception as e: # pylint: disable=broad-except
stderr_message = 'Worker %d terminated: unexpected exception:\n' % worker_id
stderr_message += format_exc()
logger.debug('worker.process failed with exception %s', stderr_message)
sys.stderr.write(stderr_message)
results_sender.send_multipart([none_marker, pickle.dumps(e)])
return
# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv_string()
logger.debug('Received control message %s', control_message)
if control_message == _CONTROL_FINISHED:
worker.shutdown()
break