remote/handlers/SharedBufferManager.cpp (122 lines of code) (raw):
#include "SharedBufferManager.h"
#include "../Utils.h"
#include "../log/Log.h"
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost::interprocess;
namespace {
const bool doTrace = getBoolEnv("CEF_SERVER_TRACE_SharedBuffer");
size_t nearestMemorySize(size_t len) {
constexpr int latticeSizeBits = 19; // i.e. 512 Kb
return ((len >> latticeSizeBits) + 1) << latticeSizeBits;
}
}
SharedBuffer::SharedBuffer(std::string uid, size_t len)
: myUid(uid), myLen(len) {
static int additionalBytes = -1;
if (additionalBytes < 0) {
const long defVal = 300; // NOTE: 256 isn't enough in Ubuntu24 arm64
additionalBytes = getLongEnv("CEF_SERVER_ADDITIONAL_SHARED_BYTES", defVal);
if (additionalBytes != defVal) {
if (additionalBytes < 0) additionalBytes = 0;
if (additionalBytes > 1024*8) additionalBytes = 1024*8;
Log::debug("SharedBuffer: set additional bytes for shared memory: %d bytes\n", additionalBytes);
}
}
if (doTrace)
Log::trace("SharedBuffer: allocate shared buffer '%s' | %.2f Mb", uid.c_str(), len/(1024*1024.f));
const std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
shared_memory_object::remove(uid.c_str());
const std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
// TODO: check allocation errors, catch and process exceptions
#ifdef WIN32
mySharedSegment = new managed_windows_shared_memory(create_only, uid.c_str(),
len + additionalBytes);
#else
mySharedSegment = new managed_shared_memory(create_only, uid.c_str(),len + additionalBytes);
#endif
const std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now();
mySharedMem = mySharedSegment->allocate(len);
mySharedMemHandle = mySharedSegment->get_handle_from_address(mySharedMem);
const std::chrono::steady_clock::time_point t3 = std::chrono::steady_clock::now();
named_mutex::remove(myUid.c_str());
const std::chrono::steady_clock::time_point t4 = std::chrono::steady_clock::now();
myMutex = new named_mutex(create_only, myUid.c_str());
if (doTrace && Log::isTraceEnabled()) {
const std::chrono::steady_clock::time_point entTime = std::chrono::steady_clock::now();
const long spentMs = (long)std::chrono::duration_cast<std::chrono::microseconds>(entTime - startTime).count();
if (spentMs > 5*1000) {
auto d1 = std::chrono::duration_cast<std::chrono::microseconds>(t1 - startTime);
auto d2 = std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1);
auto d3 = std::chrono::duration_cast<std::chrono::microseconds>(t3 - t2);
auto d4 = std::chrono::duration_cast<std::chrono::microseconds>(t4 - t3);
auto d5 = std::chrono::duration_cast<std::chrono::microseconds>(entTime - t4);
Log::trace("SharedBuffer '%s': %d bytes, ctor spent mcs: remove mem %d; ctor %d; alloc %d; remove mutex %d; mutex ctor %d",
uid.c_str(), len, (int)d1.count(), (int)d2.count(), (int)d3.count(), (int)d4.count(), (int)d5.count());
}
}
}
void SharedBuffer::_releaseShared() {
if (mySharedSegment != nullptr) {
// TODO: remove unnecessary dealloc (since going to remove whole shared
// segment)
mySharedSegment->deallocate(mySharedMem);
delete mySharedSegment;
mySharedSegment = nullptr;
mySharedMem = nullptr;
}
if (myMutex != nullptr) {
delete myMutex;
myMutex = nullptr;
}
shared_memory_object::remove(myUid.c_str());
named_mutex::remove(myUid.c_str());
}
void SharedBuffer::lock() {
if (myMutex != nullptr)
myMutex->lock();
}
bool SharedBuffer::tryLock() {
return myMutex != nullptr ? myMutex->try_lock() : false;
}
void SharedBuffer::unlock() {
if (myMutex != nullptr)
myMutex->unlock();
}
SharedBuffer::~SharedBuffer() {
_releaseShared();
}
SharedBuffer* SharedBufferManager::_getOrCreateBuffer(size_t size, int index) {
SharedBuffer* buf = myPool[index];
if (buf == nullptr || buf->size() < size) {
if (buf != nullptr) {
delete buf;
myPool[index] = buf = nullptr;
}
try {
// NOTE:
// Allocation of shared memory can fail with exception.
// Use unique name for each buffer to avoid filename collisions.
static std::atomic<int> counter(0);
myPool[index] = buf =
new SharedBuffer(string_format("R%d_%d", utils::GetPid(), counter.fetch_add(1)),nearestMemorySize(size));
} catch (const std::exception& e) {
Log::error("SharedBuffer: exception during shared buffer allocation, err: %s", e.what());
} catch (...) {
Log::error("SharedBuffer: unknown exception during shared buffer allocation");
}
}
return buf;
}
SharedBuffer* SharedBufferManager::getLockedBuffer(size_t size) {
myLastUsed = (myLastUsed + 1) % POOL_SIZE;
SharedBuffer* buf = _getOrCreateBuffer(size, myLastUsed);
if (buf == nullptr)
return nullptr;
if (!buf->tryLock()) {
// It seems that selected buffer is used now. Select another.
myLastUsed = (myLastUsed + 1) % POOL_SIZE;
buf = _getOrCreateBuffer(size, myLastUsed);
buf->lock();
}
return buf;
}
SharedBufferManager::~SharedBufferManager() {
for (int c = 0; c < POOL_SIZE; ++c)
if (myPool[c] != nullptr) {
delete myPool[c];
myPool[c] = nullptr;
}
}