cpp/velox/compute/VeloxBackend.cc (235 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include "VeloxBackend.h"
#include <folly/executors/IOThreadPoolExecutor.h>
#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
#include "utils/ConfigExtractor.h"
#ifdef GLUTEN_ENABLE_QAT
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_IAA
#include "utils/qpl/QplCodec.h"
#endif
#ifdef GLUTEN_ENABLE_GPU
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" // @manual
#include "velox/dwio/orc/reader/OrcReader.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/serializers/PrestoSerializer.h"
DECLARE_bool(velox_exception_user_stacktrace_enabled);
DECLARE_int32(velox_memory_num_shared_leaf_pools);
DECLARE_bool(velox_memory_use_hugepages);
DECLARE_bool(velox_ssd_odirect);
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
DECLARE_int32(cache_prefetch_min_pct);
DECLARE_int32(gluten_velox_async_timeout_on_task_stopping);
DEFINE_int32(gluten_velox_async_timeout_on_task_stopping, 30000, "Async timout when task is being stopped");
using namespace facebook;
namespace gluten {
namespace {
MemoryManager* veloxMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) {
return new VeloxMemoryManager(kind, std::move(listener), *VeloxBackend::get()->getBackendConf());
}
void veloxMemoryManagerReleaser(MemoryManager* memoryManager) {
delete memoryManager;
}
Runtime* veloxRuntimeFactory(
const std::string& kind,
MemoryManager* memoryManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
auto* vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager);
GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager");
return new VeloxRuntime(kind, vmm, sessionConf);
}
void veloxRuntimeReleaser(Runtime* runtime) {
delete runtime;
}
} // namespace
void VeloxBackend::init(
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& conf) {
backendConf_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(conf));
globalMemoryManager_ = std::make_unique<VeloxMemoryManager>(kVeloxBackendKind, std::move(listener), *backendConf_);
// Register factories.
MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, veloxMemoryManagerReleaser);
Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser);
if (backendConf_->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs());
}
// Init glog and log level.
if (!backendConf_->get<bool>(kDebugModeEnabled, false)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
FLAGS_minloglevel = backendConf_->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
} else {
if (backendConf_->valueExists(kGlogVerboseLevel)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
} else {
FLAGS_v = kGlogVerboseLevelMaximum;
}
}
FLAGS_logtostderr = true;
google::InitGoogleLogging("gluten");
// Allow growing buffer in another task through its memory pool.
FLAGS_velox_memory_pool_capacity_transfer_across_tasks =
backendConf_->get<bool>(kMemoryPoolCapacityTransferAcrossTasks, true);
// Avoid creating too many shared leaf pools.
FLAGS_velox_memory_num_shared_leaf_pools = 0;
// Set velox_exception_user_stacktrace_enabled.
FLAGS_velox_exception_user_stacktrace_enabled =
backendConf_->get<bool>(kEnableUserExceptionStacktrace, kEnableUserExceptionStacktraceDefault);
// Set velox_exception_system_stacktrace_enabled.
FLAGS_velox_exception_system_stacktrace_enabled =
backendConf_->get<bool>(kEnableSystemExceptionStacktrace, kEnableSystemExceptionStacktraceDefault);
// Set velox_memory_use_hugepages.
FLAGS_velox_memory_use_hugepages = backendConf_->get<bool>(kMemoryUseHugePages, kMemoryUseHugePagesDefault);
// Async timeout.
FLAGS_gluten_velox_async_timeout_on_task_stopping =
backendConf_->get<int32_t>(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault);
// Set cache_prefetch_min_pct default as 0 to force all loads are prefetched in DirectBufferInput.
FLAGS_cache_prefetch_min_pct = backendConf_->get<int>(kCachePrefetchMinPct, 0);
// Setup and register.
velox::filesystems::registerLocalFileSystem();
#ifdef ENABLE_HDFS
velox::filesystems::registerHdfsFileSystem();
#endif
#ifdef ENABLE_S3
velox::filesystems::registerS3FileSystem();
#endif
#ifdef ENABLE_GCS
velox::filesystems::registerGcsFileSystem();
#endif
#ifdef ENABLE_ABFS
velox::filesystems::registerAbfsFileSystem();
#endif
#ifdef GLUTEN_ENABLE_GPU
FLAGS_velox_cudf_debug = backendConf_->get<bool>(kDebugCudf, kDebugCudfDefault);
if (backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
velox::cudf_velox::registerCudf();
}
#endif
initJolFilesystem();
initConnector();
velox::dwio::common::registerFileSinks();
velox::parquet::registerParquetReaderFactory();
velox::parquet::registerParquetWriterFactory();
velox::orc::registerOrcReaderFactory();
velox::exec::ExprToSubfieldFilterParser::registerParserFactory(
[]() { return std::make_shared<SparkExprToSubfieldFilterParser>(); });
// Register Velox functions
registerAllFunctions();
if (!facebook::velox::isRegisteredVectorSerde()) {
// serde, for spill
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
if (!isRegisteredNamedVectorSerde(facebook::velox::VectorSerde::Kind::kPresto)) {
// RSS shuffle serde.
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());
initUdf();
// Initialize Velox-side memory manager for current process. The memory manager
// will be used during spill calls so we don't track it with Spark off-heap memory instead
// we rely on overhead memory. If we track it with off-heap memory, recursive reservations from
// Spark off-heap memory pool will be conducted to cause unexpected OOMs.
auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
int64_t memoryManagerCapacity;
if (sparkOverhead.hasValue()) {
// 0.75 * total overhead memory is used for Velox global memory manager.
// FIXME: Make this configurable.
memoryManagerCapacity = sparkOverhead.value() * 0.75;
} else {
memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
}
LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity;
facebook::velox::memory::initializeMemoryManager({.allocatorCapacity = memoryManagerCapacity});
// local cache persistent relies on the cache pool from root memory pool so we need to init this
// after the memory manager instanced
initCache();
}
facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
}
// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem() {
int64_t maxSpillFileSize = backendConf_->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);
// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
registerJolFileSystem(maxSpillFileSize);
}
std::unique_ptr<facebook::velox::cache::SsdCache> VeloxBackend::initSsdCache(uint64_t ssdCacheSize) {
FLAGS_velox_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);
int32_t ssdCacheShards = backendConf_->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = backendConf_->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);
uint64_t ssdCheckpointIntervalSize = backendConf_->get<uint64_t>(kVeloxSsdCheckpointIntervalBytes, 0);
bool disableFileCow = backendConf_->get<bool>(kVeloxSsdDisableFileCow, false);
bool checksumEnabled = backendConf_->get<bool>(kVeloxSsdCheckSumEnabled, false);
bool checksumReadVerificationEnabled = backendConf_->get<bool>(kVeloxSsdCheckSumReadVerificationEnabled, false);
cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
ssdCacheExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ssdCacheIOThreads);
const cache::SsdCache::Config config(
ssdCachePath,
ssdCacheSize,
ssdCacheShards,
ssdCacheExecutor_.get(),
ssdCheckpointIntervalSize,
disableFileCow,
checksumEnabled,
checksumReadVerificationEnabled);
auto ssd = std::make_unique<velox::cache::SsdCache>(config);
std::error_code ec;
const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec);
if (si.available < ssdCacheSize) {
VELOX_FAIL(
"not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) +
"free space: " + std::to_string(si.available));
}
LOG(INFO) << "Initializing SSD cache with: " << config.toString();
return ssd;
}
void VeloxBackend::initCache() {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false)) {
uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = backendConf_->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
velox::memory::MmapAllocator::Options options;
options.capacity = memCacheSize;
cacheAllocator_ = std::make_shared<velox::memory::MmapAllocator>(options);
if (ssdCacheSize == 0) {
LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0";
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get());
} else {
// TODO: this is not tracked by Spark.
auto ssd = initSsdCache(ssdCacheSize);
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd));
}
VELOX_CHECK_NOT_NULL(dynamic_cast<velox::cache::AsyncDataCache*>(asyncDataCache_.get()));
LOG(INFO) << "AsyncDataCache is ready";
}
}
void VeloxBackend::initConnector() {
auto hiveConf = getHiveConfig(backendConf_);
auto ioThreads = backendConf_->get<int32_t>(kVeloxIOThreads, kVeloxIOThreadsDefault);
GLUTEN_CHECK(
ioThreads >= 0,
kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));
}
void VeloxBackend::initUdf() {
auto got = backendConf_->get<std::string>(kVeloxUdfLibraryPaths, "");
if (!got.empty()) {
auto udfLoader = UdfLoader::getInstance();
udfLoader->loadUdfLibraries(got);
udfLoader->registerUdf();
}
}
std::unique_ptr<VeloxBackend> VeloxBackend::instance_ = nullptr;
void VeloxBackend::create(
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& conf) {
instance_ = std::unique_ptr<VeloxBackend>(new VeloxBackend(std::move(listener), conf));
}
VeloxBackend* VeloxBackend::get() {
if (!instance_) {
LOG(WARNING) << "VeloxBackend instance is null, please invoke VeloxBackend#create before use.";
throw GlutenException("VeloxBackend instance is null.");
}
return instance_.get();
}
} // namespace gluten