remote/handlers/SharedBufferManager.cpp (122 lines of code) (raw):

#include "SharedBufferManager.h" #include "../Utils.h" #include "../log/Log.h" #include <boost/date_time/posix_time/posix_time.hpp> using namespace boost::interprocess; namespace { const bool doTrace = getBoolEnv("CEF_SERVER_TRACE_SharedBuffer"); size_t nearestMemorySize(size_t len) { constexpr int latticeSizeBits = 19; // i.e. 512 Kb return ((len >> latticeSizeBits) + 1) << latticeSizeBits; } } SharedBuffer::SharedBuffer(std::string uid, size_t len) : myUid(uid), myLen(len) { static int additionalBytes = -1; if (additionalBytes < 0) { const long defVal = 300; // NOTE: 256 isn't enough in Ubuntu24 arm64 additionalBytes = getLongEnv("CEF_SERVER_ADDITIONAL_SHARED_BYTES", defVal); if (additionalBytes != defVal) { if (additionalBytes < 0) additionalBytes = 0; if (additionalBytes > 1024*8) additionalBytes = 1024*8; Log::debug("SharedBuffer: set additional bytes for shared memory: %d bytes\n", additionalBytes); } } if (doTrace) Log::trace("SharedBuffer: allocate shared buffer '%s' | %.2f Mb", uid.c_str(), len/(1024*1024.f)); const std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); shared_memory_object::remove(uid.c_str()); const std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now(); // TODO: check allocation errors, catch and process exceptions #ifdef WIN32 mySharedSegment = new managed_windows_shared_memory(create_only, uid.c_str(), len + additionalBytes); #else mySharedSegment = new managed_shared_memory(create_only, uid.c_str(),len + additionalBytes); #endif const std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now(); mySharedMem = mySharedSegment->allocate(len); mySharedMemHandle = mySharedSegment->get_handle_from_address(mySharedMem); const std::chrono::steady_clock::time_point t3 = std::chrono::steady_clock::now(); named_mutex::remove(myUid.c_str()); const std::chrono::steady_clock::time_point t4 = std::chrono::steady_clock::now(); myMutex = new named_mutex(create_only, myUid.c_str()); if (doTrace && Log::isTraceEnabled()) { const std::chrono::steady_clock::time_point entTime = std::chrono::steady_clock::now(); const long spentMs = (long)std::chrono::duration_cast<std::chrono::microseconds>(entTime - startTime).count(); if (spentMs > 5*1000) { auto d1 = std::chrono::duration_cast<std::chrono::microseconds>(t1 - startTime); auto d2 = std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1); auto d3 = std::chrono::duration_cast<std::chrono::microseconds>(t3 - t2); auto d4 = std::chrono::duration_cast<std::chrono::microseconds>(t4 - t3); auto d5 = std::chrono::duration_cast<std::chrono::microseconds>(entTime - t4); Log::trace("SharedBuffer '%s': %d bytes, ctor spent mcs: remove mem %d; ctor %d; alloc %d; remove mutex %d; mutex ctor %d", uid.c_str(), len, (int)d1.count(), (int)d2.count(), (int)d3.count(), (int)d4.count(), (int)d5.count()); } } } void SharedBuffer::_releaseShared() { if (mySharedSegment != nullptr) { // TODO: remove unnecessary dealloc (since going to remove whole shared // segment) mySharedSegment->deallocate(mySharedMem); delete mySharedSegment; mySharedSegment = nullptr; mySharedMem = nullptr; } if (myMutex != nullptr) { delete myMutex; myMutex = nullptr; } shared_memory_object::remove(myUid.c_str()); named_mutex::remove(myUid.c_str()); } void SharedBuffer::lock() { if (myMutex != nullptr) myMutex->lock(); } bool SharedBuffer::tryLock() { return myMutex != nullptr ? myMutex->try_lock() : false; } void SharedBuffer::unlock() { if (myMutex != nullptr) myMutex->unlock(); } SharedBuffer::~SharedBuffer() { _releaseShared(); } SharedBuffer* SharedBufferManager::_getOrCreateBuffer(size_t size, int index) { SharedBuffer* buf = myPool[index]; if (buf == nullptr || buf->size() < size) { if (buf != nullptr) { delete buf; myPool[index] = buf = nullptr; } try { // NOTE: // Allocation of shared memory can fail with exception. // Use unique name for each buffer to avoid filename collisions. static std::atomic<int> counter(0); myPool[index] = buf = new SharedBuffer(string_format("R%d_%d", utils::GetPid(), counter.fetch_add(1)),nearestMemorySize(size)); } catch (const std::exception& e) { Log::error("SharedBuffer: exception during shared buffer allocation, err: %s", e.what()); } catch (...) { Log::error("SharedBuffer: unknown exception during shared buffer allocation"); } } return buf; } SharedBuffer* SharedBufferManager::getLockedBuffer(size_t size) { myLastUsed = (myLastUsed + 1) % POOL_SIZE; SharedBuffer* buf = _getOrCreateBuffer(size, myLastUsed); if (buf == nullptr) return nullptr; if (!buf->tryLock()) { // It seems that selected buffer is used now. Select another. myLastUsed = (myLastUsed + 1) % POOL_SIZE; buf = _getOrCreateBuffer(size, myLastUsed); buf->lock(); } return buf; } SharedBufferManager::~SharedBufferManager() { for (int c = 0; c < POOL_SIZE; ++c) if (myPool[c] != nullptr) { delete myPool[c]; myPool[c] = nullptr; } }