cpp/velox/compute/VeloxRuntime.cc (256 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "VeloxRuntime.h" #include <algorithm> #include <filesystem> #include "VeloxBackend.h" #include "compute/ResultIterator.h" #include "compute/Runtime.h" #include "compute/VeloxPlanConverter.h" #include "config/VeloxConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" #include "operators/writer/VeloxColumnarBatchWriter.h" #include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" #include "utils/VeloxWholeStageDumper.h" DECLARE_bool(velox_exception_user_stacktrace_enabled); DECLARE_bool(velox_memory_use_hugepages); DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks); #ifdef ENABLE_HDFS #include "operators/writer/VeloxParquetDataSourceHDFS.h" #endif #ifdef ENABLE_S3 #include "operators/writer/VeloxParquetDataSourceS3.h" #endif #ifdef ENABLE_GCS #include "operators/writer/VeloxParquetDataSourceGCS.h" #endif #ifdef ENABLE_ABFS #include "operators/writer/VeloxParquetDataSourceABFS.h" #endif using namespace facebook; namespace gluten { VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, const std::unordered_map<std::string, std::string>& confMap) : Runtime(kind, vmm, confMap) { // Refresh session config. veloxCfg_ = std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_)); debugModeEnabled_ = veloxCfg_->get<bool>(kDebugModeEnabled, false); FLAGS_minloglevel = veloxCfg_->get<uint32_t>(kGlogSeverityLevel, FLAGS_minloglevel); FLAGS_v = veloxCfg_->get<uint32_t>(kGlogVerboseLevel, FLAGS_v); FLAGS_velox_exception_user_stacktrace_enabled = veloxCfg_->get<bool>(kEnableUserExceptionStacktrace, FLAGS_velox_exception_user_stacktrace_enabled); FLAGS_velox_exception_system_stacktrace_enabled = veloxCfg_->get<bool>(kEnableSystemExceptionStacktrace, FLAGS_velox_exception_system_stacktrace_enabled); FLAGS_velox_memory_use_hugepages = veloxCfg_->get<bool>(kMemoryUseHugePages, FLAGS_velox_memory_use_hugepages); FLAGS_velox_memory_pool_capacity_transfer_across_tasks = veloxCfg_->get<bool>( kMemoryPoolCapacityTransferAcrossTasks, FLAGS_velox_memory_pool_capacity_transfer_across_tasks); } void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) { if (debugModeEnabled_ || dumper_ != nullptr) { try { auto planJson = substraitFromPbToJson("Plan", data, size); if (dumper_ != nullptr) { dumper_->dumpPlan(planJson); } LOG_IF(INFO, debugModeEnabled_ && taskInfo_.has_value()) << std::string(50, '#') << " received substrait::Plan: " << taskInfo_.value() << std::endl << planJson; } catch (const std::exception& e) { LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what(); } } GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); } void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, int32_t splitIndex) { if (debugModeEnabled_ || dumper_ != nullptr) { try { auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size); if (dumper_ != nullptr) { dumper_->dumpInputSplit(splitIndex, splitJson); } LOG_IF(INFO, debugModeEnabled_ && taskInfo_.has_value()) << std::string(50, '#') << " received substrait::ReadRel.LocalFiles: " << taskInfo_.value() << std::endl << splitJson; } catch (const std::exception& e) { LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what(); } } ::substrait::ReadRel_LocalFiles localFile; GLUTEN_CHECK(parseProtobuf(data, size, &localFile) == true, "Parse substrait plan failed"); localFiles_.push_back(localFile); } void VeloxRuntime::getInfoAndIds( const std::unordered_map<velox::core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfoMap, const std::unordered_set<velox::core::PlanNodeId>& leafPlanNodeIds, std::vector<std::shared_ptr<SplitInfo>>& scanInfos, std::vector<velox::core::PlanNodeId>& scanIds, std::vector<velox::core::PlanNodeId>& streamIds) { for (const auto& leafPlanNodeId : leafPlanNodeIds) { auto it = splitInfoMap.find(leafPlanNodeId); if (it == splitInfoMap.end()) { throw std::runtime_error("Could not find leafPlanNodeId."); } auto splitInfo = it->second; if (splitInfo->isStream) { streamIds.emplace_back(leafPlanNodeId); } else { scanInfos.emplace_back(splitInfo); scanIds.emplace_back(leafPlanNodeId); } } } std::string VeloxRuntime::planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) { std::vector<std::shared_ptr<ResultIterator>> inputs; auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, std::nullopt, true); auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_); return veloxPlan->toString(details, true); } VeloxMemoryManager* VeloxRuntime::memoryManager() { auto vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager_); GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); return vmm; } std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator( const std::string& spillDir, const std::vector<std::shared_ptr<ResultIterator>>& inputs, const std::unordered_map<std::string, std::string>& sessionConf) { LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_); VeloxPlanConverter veloxPlanConverter( inputs, memoryManager()->getLeafMemoryPool().get(), sessionConf, *localWriteFilesTempPath()); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); // Scan node can be required. std::vector<std::shared_ptr<SplitInfo>> scanInfos; std::vector<velox::core::PlanNodeId> scanIds; std::vector<velox::core::PlanNodeId> streamIds; // Separate the scan ids and stream ids, and get the scan infos. getInfoAndIds(veloxPlanConverter.splitInfos(), veloxPlan_->leafPlanNodeIds(), scanInfos, scanIds, streamIds); auto wholeStageIter = std::make_unique<WholeStageResultIterator>( memoryManager(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{}); return std::make_shared<ResultIterator>(std::move(wholeStageIter), this); } std::shared_ptr<ColumnarToRowConverter> VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) { auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, column2RowMemThreshold); } std::shared_ptr<ColumnarBatch> VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) { auto& lookup = emptySchemaBatchLoopUp_; if (lookup.find(numRows) == lookup.end()) { auto veloxPool = memoryManager()->getLeafMemoryPool(); const std::shared_ptr<VeloxColumnarBatch>& batch = VeloxColumnarBatch::from(veloxPool.get(), gluten::createZeroColumnBatch(numRows)); lookup.emplace(numRows, batch); // the batch will be released after Spark task ends } return lookup.at(numRows); } std::shared_ptr<ColumnarBatch> VeloxRuntime::select( std::shared_ptr<ColumnarBatch> batch, const std::vector<int32_t>& columnIndices) { auto veloxPool = memoryManager()->getLeafMemoryPool(); auto veloxBatch = gluten::VeloxColumnarBatch::from(veloxPool.get(), batch); auto outputBatch = veloxBatch->select(veloxPool.get(), std::move(columnIndices)); return outputBatch; } std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverter(struct ArrowSchema* cSchema) { auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool); } std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter( int numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options) { auto veloxPool = memoryManager()->getLeafMemoryPool(); auto arrowPool = memoryManager()->getArrowMemoryPool(); GLUTEN_ASSIGN_OR_THROW( std::shared_ptr<ShuffleWriter> shuffleWriter, VeloxShuffleWriter::create( options.shuffleWriterType, numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool)); return shuffleWriter; } std::shared_ptr<VeloxDataSource> VeloxRuntime::createDataSource( const std::string& filePath, std::shared_ptr<arrow::Schema> schema) { static std::atomic_uint32_t id{0UL}; auto veloxPool = memoryManager()->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++)); // Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool // with parquet writer. // FIXME: Check file formats? auto sinkPool = memoryManager()->getLeafMemoryPool(); if (isSupportedHDFSPath(filePath)) { #ifdef ENABLE_HDFS return std::make_shared<VeloxParquetDataSourceHDFS>(filePath, veloxPool, sinkPool, schema); #else throw std::runtime_error( "The write path is hdfs path but the HDFS haven't been enabled when writing parquet data in velox runtime!"); #endif } else if (isSupportedS3SdkPath(filePath)) { #ifdef ENABLE_S3 return std::make_shared<VeloxParquetDataSourceS3>(filePath, veloxPool, sinkPool, schema); #else throw std::runtime_error( "The write path is S3 path but the S3 haven't been enabled when writing parquet data in velox runtime!"); #endif } else if (isSupportedGCSPath(filePath)) { #ifdef ENABLE_GCS return std::make_shared<VeloxParquetDataSourceGCS>(filePath, veloxPool, sinkPool, schema); #else throw std::runtime_error( "The write path is GCS path but the GCS haven't been enabled when writing parquet data in velox runtime!"); #endif } else if (isSupportedABFSPath(filePath)) { #ifdef ENABLE_ABFS return std::make_shared<VeloxParquetDataSourceABFS>(filePath, veloxPool, sinkPool, schema); #else throw std::runtime_error( "The write path is ABFS path but the ABFS haven't been enabled when writing parquet data in velox runtime!"); #endif } return std::make_shared<VeloxParquetDataSource>(filePath, veloxPool, sinkPool, schema); } std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader( std::shared_ptr<arrow::Schema> schema, ShuffleReaderOptions options) { auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); const auto veloxCompressionKind = arrowCompressionTypeToVelox(options.compressionType); const auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); auto deserializerFactory = std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>( schema, std::move(codec), veloxCompressionKind, rowType, options.batchSize, options.readerBufferSize, options.deserializerBufferSize, memoryManager()->getArrowMemoryPool(), memoryManager()->getLeafMemoryPool(), options.shuffleWriterType); return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); } std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) { auto arrowPool = memoryManager()->getArrowMemoryPool(); auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema); } void VeloxRuntime::enableDumping() { auto saveDir = veloxCfg_->get<std::string>(kGlutenSaveDir); GLUTEN_CHECK(saveDir.has_value(), kGlutenSaveDir + " is not set"); auto taskInfo = getSparkTaskInfo(); GLUTEN_CHECK(taskInfo.has_value(), "Task info is not set. Please set task info before enabling dumping."); dumper_ = std::make_shared<VeloxWholeStageDumper>( taskInfo.value(), saveDir.value(), veloxCfg_->get<int64_t>(kSparkBatchSize, 4096), memoryManager()->getAggregateMemoryPool().get()); dumper_->dumpConf(getConfMap()); } } // namespace gluten