cpp/velox/compute/VeloxPlanConverter.cc (97 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "VeloxPlanConverter.h" #include <filesystem> #include "compute/ResultIterator.h" #include "config/GlutenConfig.h" #include "operators/plannodes/RowVectorStream.h" #include "velox/common/file/FileSystems.h" namespace gluten { using namespace facebook; VeloxPlanConverter::VeloxPlanConverter( const std::vector<std::shared_ptr<ResultIterator>>& inputIters, velox::memory::MemoryPool* veloxPool, const std::unordered_map<std::string, std::string>& confMap, const std::optional<std::string> writeFilesTempPath, bool validationMode) : validationMode_(validationMode), substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode), pool_(veloxPool) { // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h. auto factory = [inputIters = std::move(inputIters), validationMode = validationMode]( std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) { std::shared_ptr<ResultIterator> iterator; if (!validationMode) { VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx); iterator = inputIters[streamIdx]; } auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, outputType); return std::make_shared<ValueStreamNode>(nodeId, outputType, std::move(valueStream)); }; substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory)); } namespace { std::shared_ptr<SplitInfo> parseScanSplitInfo( const google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>& fileList) { using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; auto splitInfo = std::make_shared<SplitInfo>(); splitInfo->paths.reserve(fileList.size()); splitInfo->starts.reserve(fileList.size()); splitInfo->lengths.reserve(fileList.size()); splitInfo->partitionColumns.reserve(fileList.size()); for (const auto& file : fileList) { // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); std::unordered_map<std::string, std::string> partitionColumnMap; for (const auto& partitionColumn : file.partition_columns()) { partitionColumnMap[partitionColumn.key()] = partitionColumn.value(); } splitInfo->partitionColumns.emplace_back(partitionColumnMap); splitInfo->paths.emplace_back(file.uri_file()); splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); switch (file.file_format_case()) { case SubstraitFileFormatCase::kOrc: splitInfo->format = dwio::common::FileFormat::ORC; break; case SubstraitFileFormatCase::kDwrf: splitInfo->format = dwio::common::FileFormat::DWRF; break; case SubstraitFileFormatCase::kParquet: splitInfo->format = dwio::common::FileFormat::PARQUET; break; case SubstraitFileFormatCase::kText: splitInfo->format = dwio::common::FileFormat::TEXT; break; default: splitInfo->format = dwio::common::FileFormat::UNKNOWN; break; } } return splitInfo; } void parseLocalFileNodes( SubstraitToVeloxPlanConverter* planConverter, std::vector<::substrait::ReadRel_LocalFiles>& localFiles) { std::vector<std::shared_ptr<SplitInfo>> splitInfos; splitInfos.reserve(localFiles.size()); for (int32_t i = 0; i < localFiles.size(); i++) { const auto& localFile = localFiles[i]; const auto& fileList = localFile.items(); splitInfos.push_back(std::move(parseScanSplitInfo(fileList))); } planConverter->setSplitInfos(std::move(splitInfos)); } } // namespace std::shared_ptr<const facebook::velox::core::PlanNode> VeloxPlanConverter::toVeloxPlan( const ::substrait::Plan& substraitPlan, std::vector<::substrait::ReadRel_LocalFiles> localFiles) { if (!validationMode_) { parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles); } auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan); DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true); return veloxPlan; } std::string VeloxPlanConverter::nextPlanNodeId() { auto id = fmt::format("{}", planNodeId_); planNodeId_++; return id; } } // namespace gluten