cpp/velox/memory/VeloxMemoryManager.cc (334 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxMemoryManager.h"
#ifdef ENABLE_JEMALLOC_STATS
#include <jemalloc/jemalloc.h>
#endif
#include "compute/VeloxBackend.h"
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/exec/MemoryReclaimer.h"
#include "config/VeloxConfig.h"
#include "memory/ArrowMemoryPool.h"
#include "utils/Exception.h"
DECLARE_int32(gluten_velox_async_timeout_on_task_stopping);
namespace gluten {
using namespace facebook;
std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
const facebook::velox::config::ConfigBase& backendConf) {
auto reservationBlockSize =
backendConf.get<uint64_t>(kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity = backendConf.get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault);
auto memReclaimMaxWaitMs = backendConf.get<uint64_t>(kVeloxMemReclaimMaxWaitMs, kVeloxMemReclaimMaxWaitMsDefault);
std::unordered_map<std::string, std::string> extraArbitratorConfigs;
extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] = folly::to<std::string>(memInitCapacity) + "B";
extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] = folly::to<std::string>(reservationBlockSize) + "B";
extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] = folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
return extraArbitratorConfigs;
}
namespace {
template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
const std::string_view& key,
const T& defaultValue) {
if (configs.count(std::string(key)) > 0) {
try {
return folly::to<T>(configs.at(std::string(key)));
} catch (const std::exception& e) {
VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}", e.what());
}
}
return defaultValue;
}
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
memoryPoolInitialCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolInitialCapacity,
std::to_string(kDefaultMemoryPoolInitialCapacity)),
velox::config::CapacityUnit::BYTE)),
memoryPoolTransferCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolTransferCapacity,
std::to_string(kDefaultMemoryPoolTransferCapacity)),
velox::config::CapacityUnit::BYTE)),
memoryReclaimMaxWaitMs_(
std::chrono::duration_cast<std::chrono::milliseconds>(velox::config::toDuration(getConfig<std::string>(
config.extraConfigs,
kMemoryReclaimMaxWaitMs,
std::string(kDefaultMemoryReclaimMaxWaitMs))))
.count()) {}
std::string kind() const override {
return kind_;
}
void shutdown() override {}
void addPool(const std::shared_ptr<velox::memory::MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.count(pool.get()), 0);
candidates_.emplace(pool.get(), pool->weak_from_this());
}
void removePool(velox::memory::MemoryPool* pool) override {
VELOX_CHECK_EQ(pool->reservedBytes(), 0);
shrinkCapacity(pool, pool->capacity());
std::unique_lock guard{mutex_};
const auto ret = candidates_.erase(pool);
VELOX_CHECK_EQ(ret, 1);
}
void growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
// Set arbitration context to allow memory over-use during recursive arbitration.
// See MemoryPoolImpl::maybeIncrementReservation.
velox::memory::ScopedMemoryArbitrationContext ctx{};
velox::memory::MemoryPool* candidate;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool");
candidate = candidates_.begin()->first;
}
VELOX_CHECK(pool->root() == candidate, "Illegal state in ListenableArbitrator");
growCapacityInternal(pool->root(), targetBytes);
}
uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool allowAbort) override {
velox::memory::ScopedMemoryArbitrationContext ctx{};
facebook::velox::exec::MemoryReclaimer::Stats status;
velox::memory::MemoryPool* pool = nullptr;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool");
pool = candidates_.begin()->first;
}
pool->reclaim(targetBytes, memoryReclaimMaxWaitMs_, status); // ignore the output
return shrinkCapacityInternal(pool, 0);
}
uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
return shrinkCapacityInternal(pool, targetBytes);
}
Stats stats() const override {
Stats stats; // no-op
return stats;
}
std::string toString() const override {
return fmt::format("ARBITRATOR[{}] CAPACITY {} {}", kind_, velox::succinctBytes(capacity()), stats().toString());
}
private:
void growCapacityInternal(velox::memory::MemoryPool* pool, uint64_t bytes) {
// Since
// https://github.com/facebookincubator/velox/pull/9557/files#diff-436e44b7374032f8f5d7eb45869602add6f955162daa2798d01cc82f8725724dL812-L820,
// We should pass bytes as parameter "reservationBytes" when calling ::grow.
auto freeByes = pool->freeBytes();
if (freeByes > bytes) {
if (growPool(pool, 0, bytes)) {
return;
}
}
auto reclaimedFreeBytes = shrinkPool(pool, 0);
auto neededBytes = velox::bits::roundUp(bytes - reclaimedFreeBytes, memoryPoolTransferCapacity_);
try {
listener_->allocationChanged(neededBytes);
} catch (const std::exception&) {
// if allocationChanged failed, we need to free the reclaimed bytes
listener_->allocationChanged(-reclaimedFreeBytes);
std::rethrow_exception(std::current_exception());
}
auto ret = growPool(pool, reclaimedFreeBytes + neededBytes, bytes);
VELOX_CHECK(
ret,
"{} failed to grow {} bytes, current state {}",
pool->name(),
velox::succinctBytes(bytes),
pool->toString());
}
uint64_t shrinkCapacityInternal(velox::memory::MemoryPool* pool, uint64_t bytes) {
uint64_t freeBytes = shrinkPool(pool, bytes);
listener_->allocationChanged(-freeBytes);
return freeBytes;
}
gluten::AllocationListener* listener_ = nullptr;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimMaxWaitMs_;
mutable std::mutex mutex_;
inline static std::string kind_ = "GLUTEN";
std::unordered_map<velox::memory::MemoryPool*, std::weak_ptr<velox::memory::MemoryPool>> candidates_;
};
} // namespace
ArbitratorFactoryRegister::ArbitratorFactoryRegister(gluten::AllocationListener* listener) : listener_(listener) {
static std::atomic_uint32_t id{0UL};
kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
velox::memory::MemoryArbitrator::registerFactory(
kind_,
[this](
const velox::memory::MemoryArbitrator::Config& config) -> std::unique_ptr<velox::memory::MemoryArbitrator> {
return std::make_unique<ListenableArbitrator>(config, listener_);
});
}
ArbitratorFactoryRegister::~ArbitratorFactoryRegister() {
velox::memory::MemoryArbitrator::unregisterFactory(kind_);
}
VeloxMemoryManager::VeloxMemoryManager(
const std::string& kind,
std::unique_ptr<AllocationListener> listener,
const facebook::velox::config::ConfigBase& backendConf)
: MemoryManager(kind), listener_(std::move(listener)) {
auto reservationBlockSize =
backendConf.get<uint64_t>(kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize);
listenableAlloc_ = std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
auto checkUsageLeak = backendConf.get<bool>(kCheckUsageLeak, kCheckUsageLeakDefault);
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
.alignment = velox::memory::MemoryAllocator::kMaxAlignment,
.trackDefaultUsage = true, // memory usage tracking
.checkUsageLeak = checkUsageLeak, // leak check
.coreOnAllocationFailureEnabled = false,
.allocatorCapacity = velox::memory::kMaxMemory,
.arbitratorKind = afr.getKind(),
.extraArbitratorConfigs = getExtraArbitratorConfigs(backendConf)};
veloxMemoryManager_ = std::make_unique<velox::memory::MemoryManager>(mmOptions);
veloxAggregatePool_ = veloxMemoryManager_->addRootPool(
"root",
velox::memory::kMaxMemory, // the 3rd capacity
facebook::velox::memory::MemoryReclaimer::create());
veloxLeafPool_ = veloxAggregatePool_->addLeafChild("default_leaf");
}
namespace {
MemoryUsageStats collectVeloxMemoryUsageStats(const velox::memory::MemoryPool* pool) {
MemoryUsageStats stats;
stats.set_current(pool->usedBytes());
stats.set_peak(pool->peakBytes());
// walk down root and all children
pool->visitChildren([&](velox::memory::MemoryPool* pool) -> bool {
stats.mutable_children()->emplace(pool->name(), collectVeloxMemoryUsageStats(pool));
return true;
});
return stats;
}
MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(const MemoryAllocator* allocator) {
MemoryUsageStats stats;
stats.set_current(allocator->getBytes());
stats.set_peak(allocator->peakBytes());
return stats;
}
int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::MemoryPool* pool, int64_t size) {
std::string poolName{pool->root()->name() + "/" + pool->name()};
std::string logPrefix{"Shrink[" + poolName + "]: "};
VLOG(2) << logPrefix << "Trying to shrink " << size << " bytes of data...";
VLOG(2) << logPrefix << "Pool has reserved " << pool->usedBytes() << "/" << pool->root()->reservedBytes() << "/"
<< pool->root()->capacity() << "/" << pool->root()->maxCapacity() << " bytes.";
VLOG(2) << logPrefix << "Shrinking...";
auto shrunken = mm->arbitrator()->shrinkCapacity(pool, 0);
VLOG(2) << logPrefix << shrunken << " bytes released from shrinking.";
return shrunken;
}
} // namespace
const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
MemoryUsageStats stats;
stats.set_current(listener_->currentBytes());
stats.set_peak(listener_->peakBytes());
stats.mutable_children()->emplace(
"gluten::MemoryAllocator", collectGlutenAllocatorMemoryUsageStats(listenableAlloc_.get()));
stats.mutable_children()->emplace(
veloxAggregatePool_->name(), collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
return stats;
}
const int64_t VeloxMemoryManager::shrink(int64_t size) {
return shrinkVeloxMemoryPool(veloxMemoryManager_.get(), veloxAggregatePool_.get(), size);
}
namespace {
void holdInternal(
std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>>& heldVeloxPools,
const velox::memory::MemoryPool* pool) {
pool->visitChildren([&](velox::memory::MemoryPool* child) -> bool {
auto shared = child->shared_from_this();
heldVeloxPools.push_back(shared);
holdInternal(heldVeloxPools, child);
return true;
});
}
} // namespace
void VeloxMemoryManager::hold() {
holdInternal(heldVeloxPools_, veloxAggregatePool_.get());
}
bool VeloxMemoryManager::tryDestructSafe() {
// Velox memory pools considered safe to destruct when no alive allocations.
for (const auto& pool : heldVeloxPools_) {
if (pool && pool->usedBytes() != 0) {
return false;
}
}
if (veloxLeafPool_ && veloxLeafPool_->usedBytes() != 0) {
return false;
}
if (veloxAggregatePool_ && veloxAggregatePool_->usedBytes() != 0) {
return false;
}
heldVeloxPools_.clear();
veloxLeafPool_.reset();
veloxAggregatePool_.reset();
// Velox memory manager considered safe to destruct when no alive pools.
if (veloxMemoryManager_) {
if (veloxMemoryManager_->numPools() > 3) {
VLOG(2) << "Attempt to destruct VeloxMemoryManager failed because there are " << veloxMemoryManager_->numPools()
<< " outstanding memory pools.";
return false;
}
if (veloxMemoryManager_->numPools() == 3) {
// Assert the pool is spill pool
// See https://github.com/facebookincubator/velox/commit/e6f84e8ac9ef6721f527a2d552a13f7e79bdf72e
// https://github.com/facebookincubator/velox/commit/ac134400b5356c5ba3f19facee37884aa020afdc
int32_t spillPoolCount = 0;
int32_t cachePoolCount = 0;
int32_t tracePoolCount = 0;
veloxMemoryManager_->testingDefaultRoot().visitChildren([&](velox::memory::MemoryPool* child) -> bool {
if (child == veloxMemoryManager_->spillPool()) {
spillPoolCount++;
}
if (child == veloxMemoryManager_->cachePool()) {
cachePoolCount++;
}
if (child == veloxMemoryManager_->tracePool()) {
tracePoolCount++;
}
return true;
});
GLUTEN_CHECK(spillPoolCount == 1, "Illegal pool count state: spillPoolCount: " + std::to_string(spillPoolCount));
GLUTEN_CHECK(cachePoolCount == 1, "Illegal pool count state: cachePoolCount: " + std::to_string(cachePoolCount));
GLUTEN_CHECK(tracePoolCount == 1, "Illegal pool count state: tracePoolCount: " + std::to_string(tracePoolCount));
}
if (veloxMemoryManager_->numPools() < 3) {
GLUTEN_CHECK(false, "Unreachable code");
}
}
veloxMemoryManager_.reset();
// Applies similar rule for Arrow memory pool.
if (arrowPool_ && arrowPool_->bytes_allocated() != 0) {
return false;
}
arrowPool_.reset();
// Successfully destructed.
return true;
}
VeloxMemoryManager::~VeloxMemoryManager() {
static const uint32_t kWaitTimeoutMs = FLAGS_gluten_velox_async_timeout_on_task_stopping; // 30s by default
uint32_t accumulatedWaitMs = 0UL;
bool destructed = false;
for (int32_t tryCount = 0; accumulatedWaitMs < kWaitTimeoutMs; tryCount++) {
destructed = tryDestructSafe();
if (destructed) {
if (tryCount > 0) {
LOG(INFO) << "All the outstanding memory resources successfully released. ";
}
break;
}
uint32_t waitMs = 50 * static_cast<uint32_t>(pow(1.5, tryCount)); // 50ms, 75ms, 112.5ms ...
LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs
<< " ms to let possible async tasks done... ";
usleep(waitMs * 1000);
accumulatedWaitMs += waitMs;
}
if (!destructed) {
LOG(ERROR) << "Failed to release Velox memory manager after " << accumulatedWaitMs
<< "ms as there are still outstanding memory resources. ";
}
#ifdef ENABLE_JEMALLOC_STATS
malloc_stats_print(NULL, NULL, NULL);
#endif
}
VeloxMemoryManager* getDefaultMemoryManager() {
return VeloxBackend::get()->getGlobalMemoryManager();
}
std::shared_ptr<velox::memory::MemoryPool> defaultLeafVeloxMemoryPool() {
return getDefaultMemoryManager()->getLeafMemoryPool();
}
} // namespace gluten