in source/neuropod/multiprocess/mq/ipc_message_queue_impl.hh [61:150]
void IPCMessageQueue<UserPayloadType>::read_worker_loop()
{
while (true)
{
// Compute the timeout
auto timeout_at = boost::interprocess::microsec_clock::universal_time() +
boost::posix_time::milliseconds(detail::MESSAGE_TIMEOUT_MS);
// Get a message
auto received = stdx::make_unique<WireFormat>();
size_t received_size;
unsigned int priority;
bool successful_read =
recv_queue_->timed_receive(received.get(), sizeof(WireFormat), received_size, priority, timeout_at);
if (!successful_read)
{
// We timed out
SPDLOG_ERROR("Timed out waiting for a response from worker process. "
"Didn't receive a message in {}ms, but expected a heartbeat every {}ms.",
detail::MESSAGE_TIMEOUT_MS,
detail::HEARTBEAT_INTERVAL_MS);
// Set a flag so that any pending writers know
lost_heartbeat_.store(true, std::memory_order_relaxed);
// Let any pending readers know
// (Because recv isn't threadsafe, there should only be one)
out_queue_.try_emplace(nullptr);
// Shutdown this thread
break;
}
if (received->type == detail::USER_PAYLOAD)
{
SPDLOG_TRACE("OPE: Read thread received user payload {}.", received->payload_type);
}
else
{
SPDLOG_TRACE("OPE: Read thread received IPC control message {}.", received->type);
}
if (received->type == detail::HEARTBEAT)
{
// This is a heartbeat message so continue
continue;
}
else if (received->type == detail::DONE)
{
// Handle DONE messages by erasing all the in_transit items for that message
uint64_t acked_id;
detail::deserialize_payload(*received, acked_id);
transferrable_controller_->done(acked_id);
}
else if (received->type == detail::SHUTDOWN_QUEUES)
{
// Start a shutdown.
shutdown_started_ = true;
// Note: we're using the `try_` variant to avoid blocking shutdown here
// Note: out_queue_ should only have one listener at any given time
// (since recv isn't threadsafe). Because of this, this message should
// wake up a user thread that is blocked (if any)
out_queue_.try_emplace(std::move(received));
}
else
{
// This is a user-handled message
out_queue_.emplace(std::move(received));
}
if (shutdown_started_)
{
// Only shutdown once we've received DONEs for all the messages we've sent
const auto in_transit_count = transferrable_controller_->size();
if (in_transit_count == 0)
{
// We can finish shutting down
break;
}
else
{
SPDLOG_TRACE("OPE: Tried to shut down read worker thread, but still waiting on {} `DONE` messages.",
in_transit_count);
}
}
}
}