in source/neuropod/multiprocess/mq/ipc_message_queue_impl.hh [294:342]
QueueMessage<UserPayloadType> IPCMessageQueue<UserPayloadType>::recv_message()
{
// Make sure the worker process is still alive
throw_if_lost_heartbeat();
// Read a message
std::unique_ptr<WireFormat> out;
out_queue_.pop(out);
if (out == nullptr)
{
// We lost communication with the other processs while we were reading
NEUROPOD_ERROR("OPE lost communication with the other process. See logs for more details.");
}
SPDLOG_TRACE(
"OPE: Received user payload of type: {} (requires done: {})", out->payload_type, out->requires_done_msg);
// Convert this to a shared ptr with a deleter that acks the message
auto shared_this = this->shared_from_this();
std::shared_ptr<WireFormat> received_shared(out.release(), [shared_this](WireFormat *msg) {
if (msg->requires_done_msg)
{
// Notify the other process that this message is done being read from
// and any associated resources can be freed
// Create a message to ack `msg`
WireFormat ack_msg;
ack_msg.type = detail::DONE;
// Serialize the payload
detail::Transferrables transferrables;
detail::serialize_payload(msg->id, ack_msg, transferrables);
if (!transferrables.empty())
{
// This must be empty otherwise we'll have an infinite DONE chain
NEUROPOD_ERROR("[OPE] Transferrables must be empty when sending a `DONE` message.");
}
// Send the message
shared_this->send_message(ack_msg);
}
delete msg;
});
return QueueMessage<UserPayloadType>(std::move(received_shared));
}