cpp/velox/compute/VeloxBackend.cc (233 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include "VeloxBackend.h"
#include <folly/executors/IOThreadPoolExecutor.h>
#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
#include "utils/ConfigExtractor.h"
#include "shuffle/VeloxShuffleReader.h"
#ifdef GLUTEN_ENABLE_QAT
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_IAA
#include "utils/qpl/qpl_codec.h"
#endif
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkTokenizer.h"
#include "udf/UdfLoader.h"
#include "utils/exception.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/serializers/PrestoSerializer.h"
DECLARE_bool(velox_exception_user_stacktrace_enabled);
DECLARE_int32(velox_memory_num_shared_leaf_pools);
DECLARE_bool(velox_memory_use_hugepages);
DECLARE_int32(cache_prefetch_min_pct);
DECLARE_int32(gluten_velox_aysnc_timeout_on_task_stopping);
DEFINE_int32(gluten_velox_aysnc_timeout_on_task_stopping, 30000, "Aysnc timout when task is being stopped");
using namespace facebook;
namespace {
const std::string kEnableUserExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace";
const bool kEnableUserExceptionStacktraceDefault = true;
const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel";
const uint32_t kGlogVerboseLevelDefault = 0;
const std::string kGlogSeverityLevel = "spark.gluten.sql.columnar.backend.velox.glogSeverityLevel";
const uint32_t kGlogSeverityLevelDefault = 1;
const std::string kEnableSystemExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableSystemExceptionStacktrace";
const bool kEnableSystemExceptionStacktraceDefault = true;
const std::string kMemoryUseHugePages = "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages";
const bool kMemoryUseHugePagesDefault = false;
const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";
// memory cache
const std::string kVeloxMemCacheSize = "spark.gluten.sql.columnar.backend.velox.memCacheSize";
const uint64_t kVeloxMemCacheSizeDefault = 1073741824; // 1G
// ssd cache
const std::string kVeloxSsdCacheSize = "spark.gluten.sql.columnar.backend.velox.ssdCacheSize";
const uint64_t kVeloxSsdCacheSizeDefault = 1073741824; // 1G
const std::string kVeloxSsdCachePath = "spark.gluten.sql.columnar.backend.velox.ssdCachePath";
const std::string kVeloxSsdCachePathDefault = "/tmp/";
const std::string kVeloxSsdCacheShards = "spark.gluten.sql.columnar.backend.velox.ssdCacheShards";
const uint32_t kVeloxSsdCacheShardsDefault = 1;
const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads";
const uint32_t kVeloxSsdCacheIOThreadsDefault = 1;
const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect";
// async
const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads";
const uint32_t kVeloxIOThreadsDefault = 0;
const std::string kVeloxAsyncTimeoutOnTaskStopping =
"spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping";
const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s
// udf
const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.udfLibraryPaths";
// spill
const std::string kMaxSpillFileSize = "spark.gluten.sql.columnar.backend.velox.maxSpillFileSize";
const uint64_t kMaxSpillFileSizeDefault = 20L * 1024 * 1024;
// backtrace allocation
const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";
// VeloxShuffleReader print flag.
const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag";
const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;
/* configs for file read in velox*/
const std::string kDirectorySizeGuess = "spark.gluten.sql.columnar.backend.velox.directorySizeGuess";
const std::string kFilePreloadThreshold = "spark.gluten.sql.columnar.backend.velox.filePreloadThreshold";
const std::string kPrefetchRowGroups = "spark.gluten.sql.columnar.backend.velox.prefetchRowGroups";
const std::string kLoadQuantum = "spark.gluten.sql.columnar.backend.velox.loadQuantum";
const std::string kMaxCoalescedDistanceBytes = "spark.gluten.sql.columnar.backend.velox.maxCoalescedDistanceBytes";
const std::string kMaxCoalescedBytes = "spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes";
const std::string kCachePrefetchMinPct = "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct";
} // namespace
namespace gluten {
namespace {
gluten::Runtime* veloxRuntimeFactory(const std::unordered_map<std::string, std::string>& sessionConf) {
return new gluten::VeloxRuntime(sessionConf);
}
} // namespace
void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf) {
// Register Velox runtime factory
gluten::Runtime::registerFactory(gluten::kVeloxRuntimeKind, veloxRuntimeFactory);
// Init glog and log level.
std::shared_ptr<const facebook::velox::Config> veloxcfg =
std::make_shared<facebook::velox::core::MemConfigMutable>(conf);
if (veloxcfg->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(veloxcfg->valuesCopy());
}
if (!veloxcfg->get<bool>(kDebugModeEnabled, false)) {
uint32_t vlogLevel = veloxcfg->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
FLAGS_v = vlogLevel;
uint32_t severityLogLevel = veloxcfg->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
FLAGS_minloglevel = severityLogLevel;
} else {
FLAGS_v = 99;
}
FLAGS_logtostderr = true;
google::InitGoogleLogging("gluten");
// Avoid creating too many shared leaf pools.
FLAGS_velox_memory_num_shared_leaf_pools = 0;
// Set velox_exception_user_stacktrace_enabled.
FLAGS_velox_exception_user_stacktrace_enabled =
veloxcfg->get<bool>(kEnableUserExceptionStacktrace, kEnableUserExceptionStacktraceDefault);
// Set velox_exception_system_stacktrace_enabled.
FLAGS_velox_exception_system_stacktrace_enabled =
veloxcfg->get<bool>(kEnableSystemExceptionStacktrace, kEnableSystemExceptionStacktraceDefault);
// Set velox_memory_use_hugepages.
FLAGS_velox_memory_use_hugepages = veloxcfg->get<bool>(kMemoryUseHugePages, kMemoryUseHugePagesDefault);
// Async timeout.
FLAGS_gluten_velox_aysnc_timeout_on_task_stopping =
veloxcfg->get<int32_t>(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault);
// Set backtrace_allocation
gluten::backtrace_allocation = veloxcfg->get<bool>(kBacktraceAllocation, false);
// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem(veloxcfg);
initCache(veloxcfg);
initConnector(veloxcfg);
// Register Velox functions
registerAllFunctions();
if (!facebook::velox::isRegisteredVectorSerde()) {
// serde, for spill
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());
initUdf(veloxcfg);
registerSparkTokenizer();
// initialize the global memory manager for current process
facebook::velox::memory::MemoryManager::initialize({});
}
facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
}
// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem(const std::shared_ptr<const facebook::velox::Config>& conf) {
int64_t maxSpillFileSize = conf->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);
// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}
void VeloxBackend::initCache(const std::shared_ptr<const facebook::velox::Config>& conf) {
bool veloxCacheEnabled = conf->get<bool>(kVeloxCacheEnabled, false);
if (veloxCacheEnabled) {
FLAGS_ssd_odirect = true;
FLAGS_ssd_odirect = conf->get<bool>(kVeloxSsdODirectEnabled, false);
uint64_t memCacheSize = conf->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = conf->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = conf->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = conf->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = conf->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);
cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
ssdCacheExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ssdCacheIOThreads);
auto ssd =
std::make_unique<velox::cache::SsdCache>(ssdCachePath, ssdCacheSize, ssdCacheShards, ssdCacheExecutor_.get());
std::error_code ec;
const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec);
if (si.available < ssdCacheSize) {
VELOX_FAIL(
"not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) +
"free space: " + std::to_string(si.available))
}
velox::memory::MmapAllocator::Options options;
options.capacity = memCacheSize;
cacheAllocator_ = std::make_shared<velox::memory::MmapAllocator>(options);
if (ssdCacheSize == 0) {
LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0";
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get());
} else {
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd));
}
VELOX_CHECK_NOT_NULL(dynamic_cast<velox::cache::AsyncDataCache*>(asyncDataCache_.get()))
LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
<< ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads;
}
}
void VeloxBackend::initConnector(const std::shared_ptr<const facebook::velox::Config>& conf) {
int32_t ioThreads = conf->get<int32_t>(kVeloxIOThreads, kVeloxIOThreadsDefault);
auto mutableConf = std::make_shared<facebook::velox::core::MemConfigMutable>(conf->valuesCopy());
auto hiveConf = getHiveConfig(conf);
for (auto& [k, v] : hiveConf->valuesCopy()) {
mutableConf->setValue(k, v);
}
#ifdef ENABLE_ABFS
const auto& confValue = conf->valuesCopy();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
mutableConf->setValue(k, v);
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
mutableConf->setValue(k.substr(accountKeyPrefixLength), v);
}
}
#endif
mutableConf->setValue(
velox::connector::hive::HiveConfig::kEnableFileHandleCache,
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");
mutableConf->setValue(
velox::connector::hive::HiveConfig::kMaxCoalescedBytes,
conf->get<std::string>(kMaxCoalescedBytes, "67108864")); // 64M
mutableConf->setValue(
velox::connector::hive::HiveConfig::kMaxCoalescedDistanceBytes,
conf->get<std::string>(kMaxCoalescedDistanceBytes, "1048576")); // 1M
mutableConf->setValue(
velox::connector::hive::HiveConfig::kPrefetchRowGroups, conf->get<std::string>(kPrefetchRowGroups, "1"));
mutableConf->setValue(
velox::connector::hive::HiveConfig::kLoadQuantum, conf->get<std::string>(kLoadQuantum, "268435456")); // 256M
mutableConf->setValue(
velox::connector::hive::HiveConfig::kFooterEstimatedSize,
conf->get<std::string>(kDirectorySizeGuess, "32768")); // 32K
mutableConf->setValue(
velox::connector::hive::HiveConfig::kFilePreloadThreshold,
conf->get<std::string>(kFilePreloadThreshold, "1048576")); // 1M
// set cache_prefetch_min_pct default as 0 to force all loads are prefetched in DirectBufferInput.
FLAGS_cache_prefetch_min_pct = conf->get<int>(kCachePrefetchMinPct, 0);
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
kHiveConnectorId,
std::make_shared<facebook::velox::core::MemConfig>(mutableConf->valuesCopy()),
ioExecutor_.get()));
}
void VeloxBackend::initUdf(const std::shared_ptr<const facebook::velox::Config>& conf) {
auto got = conf->get<std::string>(kVeloxUdfLibraryPaths, "");
if (!got.empty()) {
auto udfLoader = gluten::UdfLoader::getInstance();
udfLoader->loadUdfLibraries(got);
udfLoader->registerUdf();
}
}
std::unique_ptr<VeloxBackend> VeloxBackend::instance_ = nullptr;
void VeloxBackend::create(const std::unordered_map<std::string, std::string>& conf) {
instance_ = std::unique_ptr<VeloxBackend>(new gluten::VeloxBackend(conf));
}
VeloxBackend* VeloxBackend::get() {
if (!instance_) {
LOG(WARNING) << "VeloxBackend instance is null, please invoke VeloxBackend#create before use.";
throw GlutenException("VeloxBackend instance is null.");
}
return instance_.get();
}
} // namespace gluten