in source/neuropod/multiprocess/shm/shm_allocator.cc [248:317]
std::shared_ptr<void> SHMAllocator::load_shm(const SHMBlockID &block_id)
{
// Load an existing block of shared memory by ID
auto id = reinterpret_cast<const SHMBlockIDInternal *>(block_id.data());
auto handle = id->block_handle;
// The underlying raw block
std::shared_ptr<void> raw_block;
// Check if we can get it from the cache
load_cache_->maybe_get_and_pop(handle, raw_block);
// If not, load it from scratch
if (raw_block == nullptr)
{
raw_block = allocator_.load_shm(handle);
}
// Get the block from the raw block
auto *block = static_cast<SHMBlockInternal *>(raw_block.get());
// Sanity checks to avoid race conditions + increment the refcount
{
// Lock the block's mutex
ipc::scoped_lock<ipc::interprocess_mutex> lock(block->mutex);
// Sanity check
if (block->refcount == 0)
{
// This means that the other process isn't keeping references to data long enough for this
// process to load the data.
// This can lead to some hard to debug race conditions so we always throw an error.
NEUROPOD_ERROR("Tried getting a pointer to an existing chunk of memory that has a refcount of zero: {}",
handle);
}
// Make sure the `reuse_count` matches what we expect
if (id->reuse_count != block->reuse_count)
{
// This means that the other process isn't keeping references to data long enough for this
// process to load the data.
// (A block of memory went out of scope and was reused before it could be loaded by the other process)
NEUROPOD_ERROR("Tried loading a block of memory that went out of scope in the creating process. "
"Ensure that blocks of SHM stay in scope until they are loaded by the receiving process. "
"UUID: {}. Reuse count: {}. Expected reuse count: {}",
handle,
block->reuse_count,
id->reuse_count);
}
// Increment the refcount
block->refcount++;
}
// Create a shared pointer to the underlying data with a custom deleter
// that keeps the block alive. Add the block to the cache on destruction.
return std::shared_ptr<void>(block->data,
[this, block, raw_block = std::move(raw_block), handle](void *unused) mutable {
{
// Lock the block's mutex
ipc::scoped_lock<ipc::interprocess_mutex> lock(block->mutex);
// Decrement the refcount
block->refcount--;
}
// Add it to the load cache
load_cache_->insert(handle, std::move(raw_block));
});
}