void IPCMessageQueue::read_worker_loop()

in source/neuropod/multiprocess/mq/ipc_message_queue_impl.hh [61:150]


void IPCMessageQueue<UserPayloadType>::read_worker_loop()
{
    while (true)
    {
        // Compute the timeout
        auto timeout_at = boost::interprocess::microsec_clock::universal_time() +
                          boost::posix_time::milliseconds(detail::MESSAGE_TIMEOUT_MS);

        // Get a message
        auto         received = stdx::make_unique<WireFormat>();
        size_t       received_size;
        unsigned int priority;
        bool         successful_read =
            recv_queue_->timed_receive(received.get(), sizeof(WireFormat), received_size, priority, timeout_at);

        if (!successful_read)
        {
            // We timed out
            SPDLOG_ERROR("Timed out waiting for a response from worker process. "
                         "Didn't receive a message in {}ms, but expected a heartbeat every {}ms.",
                         detail::MESSAGE_TIMEOUT_MS,
                         detail::HEARTBEAT_INTERVAL_MS);

            // Set a flag so that any pending writers know
            lost_heartbeat_.store(true, std::memory_order_relaxed);

            // Let any pending readers know
            // (Because recv isn't threadsafe, there should only be one)
            out_queue_.try_emplace(nullptr);

            // Shutdown this thread
            break;
        }

        if (received->type == detail::USER_PAYLOAD)
        {
            SPDLOG_TRACE("OPE: Read thread received user payload {}.", received->payload_type);
        }
        else
        {
            SPDLOG_TRACE("OPE: Read thread received IPC control message {}.", received->type);
        }

        if (received->type == detail::HEARTBEAT)
        {
            // This is a heartbeat message so continue
            continue;
        }
        else if (received->type == detail::DONE)
        {
            // Handle DONE messages by erasing all the in_transit items for that message
            uint64_t acked_id;
            detail::deserialize_payload(*received, acked_id);

            transferrable_controller_->done(acked_id);
        }
        else if (received->type == detail::SHUTDOWN_QUEUES)
        {
            // Start a shutdown.
            shutdown_started_ = true;

            // Note: we're using the `try_` variant to avoid blocking shutdown here
            // Note: out_queue_ should only have one listener at any given time
            // (since recv isn't threadsafe). Because of this, this message should
            // wake up a user thread that is blocked (if any)
            out_queue_.try_emplace(std::move(received));
        }
        else
        {
            // This is a user-handled message
            out_queue_.emplace(std::move(received));
        }

        if (shutdown_started_)
        {
            // Only shutdown once we've received DONEs for all the messages we've sent
            const auto in_transit_count = transferrable_controller_->size();
            if (in_transit_count == 0)
            {
                // We can finish shutting down
                break;
            }
            else
            {
                SPDLOG_TRACE("OPE: Tried to shut down read worker thread, but still waiting on {} `DONE` messages.",
                             in_transit_count);
            }
        }
    }
}