QueueMessage IPCMessageQueue::recv_message()

in source/neuropod/multiprocess/mq/ipc_message_queue_impl.hh [294:342]


QueueMessage<UserPayloadType> IPCMessageQueue<UserPayloadType>::recv_message()
{
    // Make sure the worker process is still alive
    throw_if_lost_heartbeat();

    // Read a message
    std::unique_ptr<WireFormat> out;
    out_queue_.pop(out);

    if (out == nullptr)
    {
        // We lost communication with the other processs while we were reading
        NEUROPOD_ERROR("OPE lost communication with the other process. See logs for more details.");
    }

    SPDLOG_TRACE(
        "OPE: Received user payload of type: {} (requires done: {})", out->payload_type, out->requires_done_msg);

    // Convert this to a shared ptr with a deleter that acks the message
    auto                        shared_this = this->shared_from_this();
    std::shared_ptr<WireFormat> received_shared(out.release(), [shared_this](WireFormat *msg) {
        if (msg->requires_done_msg)
        {
            // Notify the other process that this message is done being read from
            // and any associated resources can be freed

            // Create a message to ack `msg`
            WireFormat ack_msg;
            ack_msg.type = detail::DONE;

            // Serialize the payload
            detail::Transferrables transferrables;
            detail::serialize_payload(msg->id, ack_msg, transferrables);

            if (!transferrables.empty())
            {
                // This must be empty otherwise we'll have an infinite DONE chain
                NEUROPOD_ERROR("[OPE] Transferrables must be empty when sending a `DONE` message.");
            }

            // Send the message
            shared_this->send_message(ack_msg);
        }

        delete msg;
    });

    return QueueMessage<UserPayloadType>(std::move(received_shared));
}