def _worker_bootstrap()

in petastorm/workers_pool/process_pool.py [0:0]


def _worker_bootstrap(worker_class, worker_id, control_socket, worker_receiver_socket, results_sender_socket,
                      main_process_pid, serializer, worker_args):
    """This is the root of the spawned worker processes.

    :param worker_class: A class with worker implementation.
    :param worker_id: An integer. Unique for each worker.
    :param control_socket: zmq socket used to control the worker (currently supports only :class:`zmq.FINISHED` signal)
    :param worker_receiver_socket: A zmq socket used to deliver tasks to the worker
    :param results_sender_socket: A zmq socket used to deliver the work products to the consumer
    :param serializer: A serializer object (with serialize/deserialize methods) or None.
    :param worker_args: Application specific parameter passed to worker constructor
    :return: ``None``
    """
    logger.debug('Starting _worker_bootstrap')
    context = zmq.Context()

    logger.debug('Connecting sockets')
    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.linger = 0
    work_receiver.connect(worker_receiver_socket)

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.linger = 0
    results_sender.connect(results_sender_socket)

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.linger = 0
    control_receiver.connect(control_socket)
    _setsockopt(control_receiver, zmq.SUBSCRIBE, b"")

    logger.debug('Setting up poller')
    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    results_sender.send_pyobj(_WORKER_STARTED_INDICATOR)

    # Use this 'none_marker' as the first argument to send_multipart.
    none_marker = bytes()

    logger.debug('Instantiating a worker')
    # Instantiate a worker
    worker = worker_class(worker_id, lambda data: _serialize_result_and_send(results_sender, serializer, data),
                          worker_args)

    logger.debug('Starting monitor loop')
    thread = Thread(target=_monitor_thread_function, args=(main_process_pid,))
    thread.daemon = True
    thread.start()

    # Loop and accept messages from both channels, acting accordingly
    logger.debug('Entering worker loop')
    while True:
        logger.debug('Polling new message')
        socks = dict(poller.poll())

        # If the message came from work_receiver channel
        if socks.get(work_receiver) == zmq.POLLIN:
            try:
                args, kargs = work_receiver.recv_pyobj()
                logger.debug('Starting worker.process')
                worker.process(*args, **kargs)
                logger.debug('Finished worker.process')
                results_sender.send_multipart([none_marker, pickle.dumps(VentilatedItemProcessedMessage())])
                logger.debug('Sending result')
            except Exception as e:  # pylint: disable=broad-except
                stderr_message = 'Worker %d terminated: unexpected exception:\n' % worker_id
                stderr_message += format_exc()
                logger.debug('worker.process failed with exception %s', stderr_message)
                sys.stderr.write(stderr_message)
                results_sender.send_multipart([none_marker, pickle.dumps(e)])
                return

        # If the message came over the control channel, shut down the worker.
        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv_string()
            logger.debug('Received control message %s', control_message)
            if control_message == _CONTROL_FINISHED:
                worker.shutdown()
                break