cpp/velox/compute/VeloxPlanConverter.cc (103 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxPlanConverter.h"
#include <filesystem>
#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
#include "iceberg/IcebergPlanConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/common/file/FileSystems.h"
namespace gluten {
using namespace facebook;
VeloxPlanConverter::VeloxPlanConverter(
const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
velox::memory::MemoryPool* veloxPool,
const std::unordered_map<std::string, std::string>& confMap,
const std::optional<std::string> writeFilesTempPath,
bool validationMode)
: validationMode_(validationMode),
substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode) {
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
namespace {
std::shared_ptr<SplitInfo> parseScanSplitInfo(
const google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>& fileList) {
using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->paths.reserve(fileList.size());
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
splitInfo->partitionColumns.reserve(fileList.size());
splitInfo->properties.reserve(fileList.size());
splitInfo->metadataColumns.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all Partitions share the same index.
splitInfo->partitionIndex = file.partition_index();
std::unordered_map<std::string, std::string> partitionColumnMap;
for (const auto& partitionColumn : file.partition_columns()) {
partitionColumnMap[partitionColumn.key()] = partitionColumn.value();
}
splitInfo->partitionColumns.emplace_back(partitionColumnMap);
std::unordered_map<std::string, std::string> metadataColumnMap;
for (const auto& metadataColumn : file.metadata_columns()) {
metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
}
splitInfo->metadataColumns.emplace_back(metadataColumnMap);
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
facebook::velox::FileProperties fileProps;
if (file.has_properties()) {
fileProps.fileSize = file.properties().filesize();
fileProps.modificationTime = file.properties().modificationtime();
}
splitInfo->properties.emplace_back(fileProps);
switch (file.file_format_case()) {
case SubstraitFileFormatCase::kOrc:
splitInfo->format = dwio::common::FileFormat::ORC;
break;
case SubstraitFileFormatCase::kDwrf:
splitInfo->format = dwio::common::FileFormat::DWRF;
break;
case SubstraitFileFormatCase::kParquet:
splitInfo->format = dwio::common::FileFormat::PARQUET;
break;
case SubstraitFileFormatCase::kText:
splitInfo->format = dwio::common::FileFormat::TEXT;
break;
case SubstraitFileFormatCase::kIceberg:
splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo));
break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
}
}
return splitInfo;
}
void parseLocalFileNodes(
SubstraitToVeloxPlanConverter* planConverter,
std::vector<::substrait::ReadRel_LocalFiles>& localFiles) {
std::vector<std::shared_ptr<SplitInfo>> splitInfos;
splitInfos.reserve(localFiles.size());
for (int32_t i = 0; i < localFiles.size(); i++) {
const auto& localFile = localFiles[i];
const auto& fileList = localFile.items();
splitInfos.push_back(parseScanSplitInfo(fileList));
}
planConverter->setSplitInfos(std::move(splitInfos));
}
} // namespace
std::shared_ptr<const facebook::velox::core::PlanNode> VeloxPlanConverter::toVeloxPlan(
const ::substrait::Plan& substraitPlan,
std::vector<::substrait::ReadRel_LocalFiles> localFiles) {
if (!validationMode_) {
parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles);
}
auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan);
DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true);
return veloxPlan;
}
std::string VeloxPlanConverter::nextPlanNodeId() {
auto id = fmt::format("{}", planNodeId_);
planNodeId_++;
return id;
}
} // namespace gluten