cpp/velox/substrait/SubstraitToVeloxPlan.cc (1,282 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SubstraitToVeloxPlan.h"
#include "utils/StringUtil.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
#include "velox/type/Type.h"
#include "utils/ConfigExtractor.h"
#include "config.pb.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/RowVectorStream.h"
#include "operators/writer/VeloxParquetDataSource.h"
namespace gluten {
namespace {
core::SortOrder toSortOrder(const ::substrait::SortField& sortField) {
switch (sortField.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
return core::kAscNullsFirst;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
return core::kAscNullsLast;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
return core::kDescNullsFirst;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
return core::kDescNullsLast;
default:
VELOX_FAIL("Sort direction is not supported.");
}
}
/// Holds the information required to create
/// a project node to simulate the emit
/// behavior in Substrait.
struct EmitInfo {
std::vector<core::TypedExprPtr> expressions;
std::vector<std::string> projectNames;
};
/// Helper function to extract the attributes required to create a ProjectNode
/// used for interpreting Substrait Emit.
EmitInfo getEmitInfo(const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& node) {
const auto& emit = relCommon.emit();
int emitSize = emit.output_mapping_size();
EmitInfo emitInfo;
emitInfo.projectNames.reserve(emitSize);
emitInfo.expressions.reserve(emitSize);
const auto& outputType = node->outputType();
for (int i = 0; i < emitSize; i++) {
int32_t mapId = emit.output_mapping(i);
emitInfo.projectNames[i] = outputType->nameOf(mapId);
emitInfo.expressions[i] =
std::make_shared<core::FieldAccessTypedExpr>(outputType->childAt(mapId), outputType->nameOf(mapId));
}
return emitInfo;
}
/// @brief Get the input type from both sides of join.
/// @param leftNode the plan node of left side.
/// @param rightNode the plan node of right side.
/// @return the input type.
RowTypePtr getJoinInputType(const core::PlanNodePtr& leftNode, const core::PlanNodePtr& rightNode) {
auto outputSize = leftNode->outputType()->size() + rightNode->outputType()->size();
std::vector<std::string> outputNames;
std::vector<std::shared_ptr<const Type>> outputTypes;
outputNames.reserve(outputSize);
outputTypes.reserve(outputSize);
for (const auto& node : {leftNode, rightNode}) {
const auto& names = node->outputType()->names();
outputNames.insert(outputNames.end(), names.begin(), names.end());
const auto& types = node->outputType()->children();
outputTypes.insert(outputTypes.end(), types.begin(), types.end());
}
return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
}
/// @brief Get the direct output type of join.
/// @param leftNode the plan node of left side.
/// @param rightNode the plan node of right side.
/// @param joinType the join type.
/// @return the output type.
RowTypePtr getJoinOutputType(
const core::PlanNodePtr& leftNode,
const core::PlanNodePtr& rightNode,
const core::JoinType& joinType) {
// Decide output type.
// Output of right semi join cannot include columns from the left side.
bool outputMayIncludeLeftColumns = !(core::isRightSemiFilterJoin(joinType) || core::isRightSemiProjectJoin(joinType));
// Output of left semi and anti joins cannot include columns from the right
// side.
bool outputMayIncludeRightColumns =
!(core::isLeftSemiFilterJoin(joinType) || core::isLeftSemiProjectJoin(joinType) || core::isAntiJoin(joinType));
if (outputMayIncludeLeftColumns && outputMayIncludeRightColumns) {
return getJoinInputType(leftNode, rightNode);
}
if (outputMayIncludeLeftColumns) {
if (core::isLeftSemiProjectJoin(joinType)) {
std::vector<std::string> outputNames = leftNode->outputType()->names();
std::vector<std::shared_ptr<const Type>> outputTypes = leftNode->outputType()->children();
outputNames.emplace_back("exists");
outputTypes.emplace_back(BOOLEAN());
return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
} else {
return leftNode->outputType();
}
}
if (outputMayIncludeRightColumns) {
if (core::isRightSemiProjectJoin(joinType)) {
std::vector<std::string> outputNames = rightNode->outputType()->names();
std::vector<std::shared_ptr<const Type>> outputTypes = rightNode->outputType()->children();
outputNames.emplace_back("exists");
outputTypes.emplace_back(BOOLEAN());
return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
} else {
return rightNode->outputType();
}
}
VELOX_FAIL("Output should include left or right columns.");
}
} // namespace
core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit(
const ::substrait::RelCommon& relCommon,
const core::PlanNodePtr& noEmitNode) {
switch (relCommon.emit_kind_case()) {
case ::substrait::RelCommon::EmitKindCase::kDirect:
return noEmitNode;
case ::substrait::RelCommon::EmitKindCase::kEmit: {
auto emitInfo = getEmitInfo(relCommon, noEmitNode);
return std::make_shared<core::ProjectNode>(
nextPlanNodeId(), std::move(emitInfo.projectNames), std::move(emitInfo.expressions), noEmitNode);
}
default:
VELOX_FAIL("unrecognized emit kind");
}
}
core::AggregationNode::Step SubstraitToVeloxPlanConverter::toAggregationStep(const ::substrait::AggregateRel& aggRel) {
// TODO Simplify Velox's aggregation steps
if (aggRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "allowFlush=")) {
return core::AggregationNode::Step::kPartial;
}
return core::AggregationNode::Step::kSingle;
}
/// Get aggregation function step for AggregateFunction.
/// The returned step value will be used to decide which Velox aggregate function or companion function
/// is used for the actual data processing.
core::AggregationNode::Step SubstraitToVeloxPlanConverter::toAggregationFunctionStep(
const ::substrait::AggregateFunction& sAggFuc) {
const auto& phase = sAggFuc.phase();
switch (phase) {
case ::substrait::AGGREGATION_PHASE_UNSPECIFIED:
VELOX_FAIL("Aggregation phase not specified.");
break;
case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE:
return core::AggregationNode::Step::kPartial;
case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE:
return core::AggregationNode::Step::kIntermediate;
case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT:
return core::AggregationNode::Step::kSingle;
case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT:
return core::AggregationNode::Step::kFinal;
default:
VELOX_FAIL("Unexpected aggregation phase.");
}
}
std::string SubstraitToVeloxPlanConverter::toAggregationFunctionName(
const std::string& baseName,
const core::AggregationNode::Step& step) {
std::string suffix;
switch (step) {
case core::AggregationNode::Step::kPartial:
suffix = "_partial";
break;
case core::AggregationNode::Step::kFinal:
suffix = "_merge_extract";
break;
case core::AggregationNode::Step::kIntermediate:
suffix = "_merge";
break;
case core::AggregationNode::Step::kSingle:
suffix = "";
break;
default:
VELOX_FAIL("Unexpected aggregation node step.");
}
return baseName + suffix;
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::JoinRel& sJoin) {
if (!sJoin.has_left()) {
VELOX_FAIL("Left Rel is expected in JoinRel.");
}
if (!sJoin.has_right()) {
VELOX_FAIL("Right Rel is expected in JoinRel.");
}
auto leftNode = toVeloxPlan(sJoin.left());
auto rightNode = toVeloxPlan(sJoin.right());
// Map join type.
core::JoinType joinType;
bool isNullAwareAntiJoin = false;
switch (sJoin.type()) {
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_INNER:
joinType = core::JoinType::kInner;
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_OUTER:
joinType = core::JoinType::kFull;
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT:
joinType = core::JoinType::kLeft;
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT:
joinType = core::JoinType::kRight;
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
// Determine the semi join type based on extracted information.
if (sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isExistenceJoin=")) {
joinType = core::JoinType::kLeftSemiProject;
} else {
joinType = core::JoinType::kLeftSemiFilter;
}
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
// Determine the semi join type based on extracted information.
if (sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isExistenceJoin=")) {
joinType = core::JoinType::kRightSemiProject;
} else {
joinType = core::JoinType::kRightSemiFilter;
}
break;
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_ANTI: {
// Determine the anti join type based on extracted information.
if (sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isNullAwareAntiJoin=")) {
isNullAwareAntiJoin = true;
}
joinType = core::JoinType::kAnti;
break;
}
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(sJoin.type()));
}
// extract join keys from join expression
std::vector<const ::substrait::Expression::FieldReference*> leftExprs, rightExprs;
extractJoinKeys(sJoin.expression(), leftExprs, rightExprs);
VELOX_CHECK_EQ(leftExprs.size(), rightExprs.size());
size_t numKeys = leftExprs.size();
std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>> leftKeys, rightKeys;
leftKeys.reserve(numKeys);
rightKeys.reserve(numKeys);
auto inputRowType = getJoinInputType(leftNode, rightNode);
for (size_t i = 0; i < numKeys; ++i) {
leftKeys.emplace_back(exprConverter_->toVeloxExpr(*leftExprs[i], inputRowType));
rightKeys.emplace_back(exprConverter_->toVeloxExpr(*rightExprs[i], inputRowType));
}
core::TypedExprPtr filter;
if (sJoin.has_post_join_filter()) {
filter = exprConverter_->toVeloxExpr(sJoin.post_join_filter(), inputRowType);
}
if (sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isSMJ=")) {
// Create MergeJoinNode node
return std::make_shared<core::MergeJoinNode>(
nextPlanNodeId(),
joinType,
leftKeys,
rightKeys,
filter,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));
} else {
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinType,
isNullAwareAntiJoin,
leftKeys,
rightKeys,
filter,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::CrossRel& crossRel) {
// Support basic cross join without any filters
if (!crossRel.has_left()) {
VELOX_FAIL("Left Rel is expected in CrossRel.");
}
if (!crossRel.has_right()) {
VELOX_FAIL("Right Rel is expected in CrossRel.");
}
auto leftNode = toVeloxPlan(crossRel.left());
auto rightNode = toVeloxPlan(crossRel.right());
// Map join type.
core::JoinType joinType;
switch (crossRel.type()) {
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_INNER:
joinType = core::JoinType::kInner;
break;
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_LEFT:
joinType = core::JoinType::kLeft;
break;
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
joinType = core::JoinType::kFull;
break;
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
auto inputRowType = getJoinInputType(leftNode, rightNode);
core::TypedExprPtr joinConditions;
if (crossRel.has_expression()) {
joinConditions = exprConverter_->toVeloxExpr(crossRel.expression(), inputRowType);
}
return std::make_shared<core::NestedLoopJoinNode>(
nextPlanNodeId(),
joinType,
joinConditions,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::AggregateRel& aggRel) {
auto childNode = convertSingleInput<::substrait::AggregateRel>(aggRel);
core::AggregationNode::Step aggStep = toAggregationStep(aggRel);
const auto& inputType = childNode->outputType();
std::vector<core::FieldAccessTypedExprPtr> veloxGroupingExprs;
// Get the grouping expressions.
for (const auto& grouping : aggRel.groupings()) {
for (const auto& groupingExpr : grouping.grouping_expressions()) {
// Velox's groupings are limited to be Field.
veloxGroupingExprs.emplace_back(exprConverter_->toVeloxExpr(groupingExpr.selection(), inputType));
}
}
// Parse measures and get the aggregate expressions.
// Each measure represents one aggregate expression.
std::vector<core::AggregationNode::Aggregate> aggregates;
aggregates.reserve(aggRel.measures().size());
for (const auto& measure : aggRel.measures()) {
core::FieldAccessTypedExprPtr mask;
::substrait::Expression substraitAggMask = measure.filter();
// Get Aggregation Masks.
if (measure.has_filter()) {
if (substraitAggMask.ByteSizeLong() > 0) {
mask = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
exprConverter_->toVeloxExpr(substraitAggMask, inputType));
}
}
const auto& aggFunction = measure.measure();
auto baseFuncName = SubstraitParser::findVeloxFunction(functionMap_, aggFunction.function_reference());
auto funcName = toAggregationFunctionName(baseFuncName, toAggregationFunctionStep(aggFunction));
std::vector<core::TypedExprPtr> aggParams;
aggParams.reserve(aggFunction.arguments().size());
for (const auto& arg : aggFunction.arguments()) {
aggParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
}
auto aggVeloxType = SubstraitParser::parseType(aggFunction.output_type());
auto aggExpr = std::make_shared<const core::CallTypedExpr>(aggVeloxType, std::move(aggParams), funcName);
std::vector<TypePtr> rawInputTypes =
SubstraitParser::sigToTypes(SubstraitParser::findFunctionSpec(functionMap_, aggFunction.function_reference()));
aggregates.emplace_back(core::AggregationNode::Aggregate{aggExpr, rawInputTypes, mask, {}, {}});
}
bool ignoreNullKeys = false;
std::vector<core::FieldAccessTypedExprPtr> preGroupingExprs;
if (aggRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "isStreaming=")) {
preGroupingExprs.reserve(veloxGroupingExprs.size());
preGroupingExprs.insert(preGroupingExprs.begin(), veloxGroupingExprs.begin(), veloxGroupingExprs.end());
}
if (aggRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "ignoreNullKeys=")) {
ignoreNullKeys = true;
}
// Get the output names of Aggregation.
std::vector<std::string> aggOutNames;
aggOutNames.reserve(aggRel.measures().size());
for (int idx = veloxGroupingExprs.size(); idx < veloxGroupingExprs.size() + aggRel.measures().size(); idx++) {
aggOutNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, idx));
}
auto aggregationNode = std::make_shared<core::AggregationNode>(
nextPlanNodeId(),
aggStep,
veloxGroupingExprs,
preGroupingExprs,
aggOutNames,
aggregates,
ignoreNullKeys,
childNode);
if (aggRel.has_common()) {
return processEmit(aggRel.common(), std::move(aggregationNode));
} else {
return aggregationNode;
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ProjectRel& projectRel) {
auto childNode = convertSingleInput<::substrait::ProjectRel>(projectRel);
// Construct Velox Expressions.
const auto& projectExprs = projectRel.expressions();
std::vector<std::string> projectNames;
std::vector<core::TypedExprPtr> expressions;
projectNames.reserve(projectExprs.size());
expressions.reserve(projectExprs.size());
const auto& inputType = childNode->outputType();
int colIdx = 0;
// Note that Substrait projection adds the project expressions on top of the
// input to the projection node. Thus we need to add the input columns first
// and then add the projection expressions.
// First, adding the project names and expressions from the input to
// the project node.
for (uint32_t idx = 0; idx < inputType->size(); idx++) {
const auto& fieldName = inputType->nameOf(idx);
projectNames.emplace_back(fieldName);
expressions.emplace_back(std::make_shared<core::FieldAccessTypedExpr>(inputType->childAt(idx), fieldName));
colIdx += 1;
}
// Then, adding project expression related project names and expressions.
for (const auto& expr : projectExprs) {
expressions.emplace_back(exprConverter_->toVeloxExpr(expr, inputType));
projectNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, colIdx));
colIdx += 1;
}
if (projectRel.has_common()) {
auto relCommon = projectRel.common();
const auto& emit = relCommon.emit();
int emitSize = emit.output_mapping_size();
std::vector<std::string> emitProjectNames(emitSize);
std::vector<core::TypedExprPtr> emitExpressions(emitSize);
for (int i = 0; i < emitSize; i++) {
int32_t mapId = emit.output_mapping(i);
emitProjectNames[i] = projectNames[mapId];
emitExpressions[i] = expressions[mapId];
}
return std::make_shared<core::ProjectNode>(
nextPlanNodeId(), std::move(emitProjectNames), std::move(emitExpressions), std::move(childNode));
} else {
return std::make_shared<core::ProjectNode>(
nextPlanNodeId(), std::move(projectNames), std::move(expressions), std::move(childNode));
}
}
std::string makeUuid() {
return generateUuid();
}
std::string compressionFileNameSuffix(common::CompressionKind kind) {
switch (static_cast<int32_t>(kind)) {
case common::CompressionKind_ZLIB:
return ".zlib";
case common::CompressionKind_SNAPPY:
return ".snappy";
case common::CompressionKind_LZO:
return ".lzo";
case common::CompressionKind_ZSTD:
return ".zstd";
case common::CompressionKind_LZ4:
return ".lz4";
case common::CompressionKind_GZIP:
return ".gz";
case common::CompressionKind_NONE:
default:
return "";
}
}
std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
const std::string& targetDirectory,
dwio::common::FileFormat fileFormat,
common::CompressionKind compression,
const bool& isBucketed,
const std::optional<std::string>& writeDirectory = std::nullopt,
const connector::hive::LocationHandle::TableType& tableType =
connector::hive::LocationHandle::TableType::kExisting) {
std::string targetFileName = "";
if (fileFormat == dwio::common::FileFormat::PARQUET && !isBucketed) {
targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), compressionFileNameSuffix(compression), ".parquet");
}
return std::make_shared<connector::hive::LocationHandle>(
targetDirectory, writeDirectory.value_or(targetDirectory), tableType, targetFileName);
}
std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandle(
const std::vector<std::string>& tableColumnNames,
const std::vector<TypePtr>& tableColumnTypes,
const std::vector<std::string>& partitionedBy,
const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind>& compressionKind = {}) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> columnHandles;
columnHandles.reserve(tableColumnNames.size());
std::vector<std::string> bucketedBy;
std::vector<TypePtr> bucketedTypes;
std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>> sortedBy;
if (bucketProperty != nullptr) {
bucketedBy = bucketProperty->bucketedBy();
bucketedTypes = bucketProperty->bucketedTypes();
sortedBy = bucketProperty->sortedBy();
}
int32_t numPartitionColumns{0};
int32_t numSortingColumns{0};
int32_t numBucketColumns{0};
for (int i = 0; i < tableColumnNames.size(); ++i) {
for (int j = 0; j < bucketedBy.size(); ++j) {
if (bucketedBy[j] == tableColumnNames[i]) {
++numBucketColumns;
}
}
for (int j = 0; j < sortedBy.size(); ++j) {
if (sortedBy[j]->sortColumn() == tableColumnNames[i]) {
++numSortingColumns;
}
}
if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
++numPartitionColumns;
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
} else {
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
}
}
VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size());
VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
return std::make_shared<connector::hive::HiveInsertTableHandle>(
columnHandles,
locationHandle,
tableStorageFormat,
bucketProperty,
compressionKind,
std::unordered_map<std::string, std::string>{},
writerOptions);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) {
core::PlanNodePtr childNode;
if (writeRel.has_input()) {
childNode = toVeloxPlan(writeRel.input());
} else {
VELOX_FAIL("Child Rel is expected in WriteRel.");
}
const auto& inputType = childNode->outputType();
std::vector<std::string> tableColumnNames;
std::vector<std::string> partitionedKey;
std::vector<ColumnType> columnTypes;
tableColumnNames.reserve(writeRel.table_schema().names_size());
VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table schema to store the column information");
const auto& tableSchema = writeRel.table_schema();
SubstraitParser::parseColumnTypes(tableSchema, columnTypes);
for (const auto& name : tableSchema.names()) {
tableColumnNames.emplace_back(name);
}
for (int i = 0; i < tableSchema.names_size(); i++) {
if (columnTypes[i] == ColumnType::kPartitionKey) {
partitionedKey.emplace_back(tableColumnNames[i]);
}
}
std::shared_ptr<connector::hive::HiveBucketProperty> bucketProperty = nullptr;
if (writeRel.has_bucket_spec()) {
const auto& bucketSpec = writeRel.bucket_spec();
const auto& numBuckets = bucketSpec.num_buckets();
std::vector<std::string> bucketedBy;
for (const auto& name : bucketSpec.bucket_column_names()) {
bucketedBy.emplace_back(name);
}
std::vector<TypePtr> bucketedTypes;
bucketedTypes.reserve(bucketedBy.size());
std::vector<TypePtr> tableColumnTypes = inputType->children();
for (const auto& name : bucketedBy) {
auto it = std::find(tableColumnNames.begin(), tableColumnNames.end(), name);
VELOX_CHECK(it != tableColumnNames.end(), "Invalid bucket {}", name);
std::size_t index = std::distance(tableColumnNames.begin(), it);
bucketedTypes.emplace_back(tableColumnTypes[index]);
}
std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>> sortedBy;
for (const auto& name : bucketSpec.sort_column_names()) {
sortedBy.emplace_back(std::make_shared<connector::hive::HiveSortingColumn>(name, core::SortOrder{true, true}));
}
bucketProperty = std::make_shared<connector::hive::HiveBucketProperty>(
connector::hive::HiveBucketProperty::Kind::kHiveCompatible, numBuckets, bucketedBy, bucketedTypes, sortedBy);
}
std::string writePath;
if (writeFilesTempPath_.has_value()) {
writePath = writeFilesTempPath_.value();
} else {
VELOX_CHECK(validationMode_, "WriteRel should have the write path before initializing the plan.");
writePath = "";
}
GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced extension not found in WriteRel");
const auto& ext = writeRel.named_table().advanced_extension();
GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in WriteRel");
const auto& opt = ext.optimization();
gluten::ConfigMap confMap;
opt.UnpackTo(&confMap);
std::unordered_map<std::string, std::string> writeConfs;
for (const auto& item : *(confMap.mutable_configs())) {
writeConfs.emplace(item.first, item.second);
}
// Currently only support parquet format.
const std::string& formatShortName = writeConfs["format"];
GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: " + formatShortName);
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
const std::shared_ptr<facebook::velox::parquet::WriterOptions> writerOptions =
VeloxParquetDataSource::makeParquetWriteOption(writeConfs);
// Spark's default compression code is snappy.
const auto& compressionKind =
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);
return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
tableColumnNames,
nullptr, /*aggregationNode*/
std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind)),
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(nullptr),
connector::CommitStrategy::kNoCommit,
childNode);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ExpandRel& expandRel) {
core::PlanNodePtr childNode;
if (expandRel.has_input()) {
childNode = toVeloxPlan(expandRel.input());
} else {
VELOX_FAIL("Child Rel is expected in ExpandRel.");
}
const auto& inputType = childNode->outputType();
std::vector<std::vector<core::TypedExprPtr>> projectSetExprs;
projectSetExprs.reserve(expandRel.fields_size());
for (const auto& projections : expandRel.fields()) {
std::vector<core::TypedExprPtr> projectExprs;
projectExprs.reserve(projections.switching_field().duplicates_size());
for (const auto& projectExpr : projections.switching_field().duplicates()) {
if (projectExpr.has_selection()) {
auto expression = exprConverter_->toVeloxExpr(projectExpr.selection(), inputType);
projectExprs.emplace_back(expression);
} else if (projectExpr.has_literal()) {
auto expression = exprConverter_->toVeloxExpr(projectExpr.literal());
projectExprs.emplace_back(expression);
} else {
VELOX_FAIL("The project in Expand Operator only support field or literal.");
}
}
projectSetExprs.emplace_back(projectExprs);
}
auto projectSize = expandRel.fields()[0].switching_field().duplicates_size();
std::vector<std::string> names;
names.reserve(projectSize);
for (int idx = 0; idx < projectSize; idx++) {
names.push_back(SubstraitParser::makeNodeName(planNodeId_, idx));
}
return std::make_shared<core::ExpandNode>(nextPlanNodeId(), projectSetExprs, std::move(names), childNode);
}
namespace {
void extractUnnestFieldExpr(
std::shared_ptr<const core::PlanNode> child,
int32_t index,
std::vector<core::FieldAccessTypedExprPtr>& unnestFields) {
if (auto projNode = std::dynamic_pointer_cast<const core::ProjectNode>(child)) {
auto name = projNode->names()[index];
auto expr = projNode->projections()[index];
auto type = expr->type();
auto unnestFieldExpr = std::make_shared<core::FieldAccessTypedExpr>(type, name);
VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only support field");
unnestFields.emplace_back(unnestFieldExpr);
} else {
auto name = child->outputType()->names()[index];
auto field = child->outputType()->childAt(index);
auto unnestFieldExpr = std::make_shared<core::FieldAccessTypedExpr>(field, name);
unnestFields.emplace_back(unnestFieldExpr);
}
}
} // namespace
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::GenerateRel& generateRel) {
core::PlanNodePtr childNode;
if (generateRel.has_input()) {
childNode = toVeloxPlan(generateRel.input());
} else {
VELOX_FAIL("Child Rel is expected in GenerateRel.");
}
const auto& inputType = childNode->outputType();
std::vector<core::FieldAccessTypedExprPtr> replicated;
std::vector<core::FieldAccessTypedExprPtr> unnest;
const auto& generator = generateRel.generator();
const auto& requiredChildOutput = generateRel.child_output();
replicated.reserve(requiredChildOutput.size());
for (const auto& output : requiredChildOutput) {
auto expression = exprConverter_->toVeloxExpr(output, inputType);
auto exprField = dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
VELOX_CHECK(exprField != nullptr, " the output in Generate Operator only support field");
replicated.emplace_back(std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression));
}
auto injectedProject = generateRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "injectedProject=");
if (injectedProject) {
// Child should be either ProjectNode or ValueStreamNode in case of project fallback.
VELOX_CHECK(
(std::dynamic_pointer_cast<const core::ProjectNode>(childNode) != nullptr ||
std::dynamic_pointer_cast<const ValueStreamNode>(childNode) != nullptr) &&
childNode->outputType()->size() > requiredChildOutput.size(),
"injectedProject is true, but the ProjectNode or ValueStreamNode (in case of projection fallback)"
" is missing or does not have the corresponding projection field");
bool isStack = generateRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "isStack=");
// Generator function's input is NOT a field reference.
if (!isStack) {
// For generator function which is not stack, e.g. explode(array(1,2,3)), a sample
// input substrait plan is like the following:
//
// Generate explode([1,2,3] AS _pre_0#129), false, [col#126]
// +- Project [fake_column#128, [1,2,3] AS _pre_0#129]
// +- RewrittenNodeWall Scan OneRowRelation[fake_column#128]
// The last projection column in GeneratorRel's child(Project) is the column we need to unnest
auto index = childNode->outputType()->size() - 1;
extractUnnestFieldExpr(childNode, index, unnest);
} else {
// For stack function, e.g. stack(2, 1,2,3), a sample
// input substrait plan is like the following:
//
// Generate stack(2, id#122, name#123, id1#124, name1#125), false, [col0#137, col1#138]
// +- Project [id#122, name#123, id1#124, name1#125, array(id#122, id1#124) AS _pre_0#141, array(name#123,
// name1#125) AS _pre_1#142]
// +- RewrittenNodeWall LocalTableScan [id#122, name#123, id1#124, name1#125]
//
// The last `numFields` projections are the fields we want to unnest.
auto generatorFunc = generator.scalar_function();
auto numRows = SubstraitParser::getLiteralValue<int32_t>(generatorFunc.arguments(0).value().literal());
auto numFields = static_cast<int32_t>(std::ceil((generatorFunc.arguments_size() - 1.0) / numRows));
auto totalProjectCount = childNode->outputType()->size();
for (auto i = totalProjectCount - numFields; i < totalProjectCount; ++i) {
extractUnnestFieldExpr(childNode, i, unnest);
}
}
} else {
// Generator function's input is a field reference, e.g. explode(col), generator
// function's first argument is the field reference we need to unnest.
// This assumption holds for all the supported generator function:
// explode, posexplode, inline.
auto generatorFunc = generator.scalar_function();
auto unnestExpr = exprConverter_->toVeloxExpr(generatorFunc.arguments(0).value(), inputType);
auto unnestFieldExpr = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(unnestExpr);
VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only support field");
unnest.emplace_back(unnestFieldExpr);
}
std::vector<std::string> unnestNames;
int unnestIndex = 0;
for (const auto& variable : unnest) {
if (variable->type()->isArray()) {
unnestNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, unnestIndex++));
} else if (variable->type()->isMap()) {
unnestNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, unnestIndex++));
unnestNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, unnestIndex++));
} else {
VELOX_FAIL(
"Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.", variable->type()->toString());
}
}
std::optional<std::string> ordinalityName = std::nullopt;
if (generateRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "isPosExplode=")) {
ordinalityName = std::make_optional<std::string>("pos");
}
return std::make_shared<core::UnnestNode>(
nextPlanNodeId(), replicated, unnest, std::move(unnestNames), ordinalityName, childNode);
}
const core::WindowNode::Frame SubstraitToVeloxPlanConverter::createWindowFrame(
const ::substrait::Expression_WindowFunction_Bound& lower_bound,
const ::substrait::Expression_WindowFunction_Bound& upper_bound,
const ::substrait::WindowType& type,
const RowTypePtr& inputType) {
core::WindowNode::Frame frame;
switch (type) {
case ::substrait::WindowType::ROWS:
frame.type = core::WindowNode::WindowType::kRows;
break;
case ::substrait::WindowType::RANGE:
frame.type = core::WindowNode::WindowType::kRange;
break;
default:
VELOX_FAIL("the window type only support ROWS and RANGE, and the input type is ", std::to_string(type));
}
auto specifiedBound =
[&](bool hasOffset, int64_t offset, const ::substrait::Expression& columnRef) -> core::TypedExprPtr {
if (hasOffset) {
VELOX_CHECK(
frame.type != core::WindowNode::WindowType::kRange,
"for RANGE frame offset, we should pre-calculate the range frame boundary and pass the column reference, but got a constant offset.");
return std::make_shared<core::ConstantTypedExpr>(BIGINT(), variant(offset));
} else {
VELOX_CHECK(
frame.type != core::WindowNode::WindowType::kRows, "for ROW frame offset, we should pass a constant offset.");
return exprConverter_->toVeloxExpr(columnRef, inputType);
}
};
auto boundTypeConversion = [&](::substrait::Expression_WindowFunction_Bound boundType)
-> std::tuple<core::WindowNode::BoundType, core::TypedExprPtr> {
if (boundType.has_current_row()) {
return std::make_tuple(core::WindowNode::BoundType::kCurrentRow, nullptr);
} else if (boundType.has_unbounded_following()) {
return std::make_tuple(core::WindowNode::BoundType::kUnboundedFollowing, nullptr);
} else if (boundType.has_unbounded_preceding()) {
return std::make_tuple(core::WindowNode::BoundType::kUnboundedPreceding, nullptr);
} else if (boundType.has_following()) {
auto following = boundType.following();
return std::make_tuple(
core::WindowNode::BoundType::kFollowing,
specifiedBound(following.has_offset(), following.offset(), following.ref()));
} else if (boundType.has_preceding()) {
auto preceding = boundType.preceding();
return std::make_tuple(
core::WindowNode::BoundType::kPreceding,
specifiedBound(preceding.has_offset(), preceding.offset(), preceding.ref()));
} else {
VELOX_FAIL("The BoundType is not supported.");
}
};
std::tie(frame.startType, frame.startValue) = boundTypeConversion(lower_bound);
std::tie(frame.endType, frame.endValue) = boundTypeConversion(upper_bound);
return frame;
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WindowRel& windowRel) {
core::PlanNodePtr childNode;
if (windowRel.has_input()) {
childNode = toVeloxPlan(windowRel.input());
} else {
VELOX_FAIL("Child Rel is expected in WindowRel.");
}
const auto& inputType = childNode->outputType();
// Parse measures and get the window expressions.
// Each measure represents one window expression.
std::vector<core::WindowNode::Function> windowNodeFunctions;
std::vector<std::string> windowColumnNames;
windowNodeFunctions.reserve(windowRel.measures().size());
for (const auto& smea : windowRel.measures()) {
const auto& windowFunction = smea.measure();
std::string funcName = SubstraitParser::findVeloxFunction(functionMap_, windowFunction.function_reference());
std::vector<core::TypedExprPtr> windowParams;
auto& argumentList = windowFunction.arguments();
windowParams.reserve(argumentList.size());
const auto& options = windowFunction.options();
// For functions in kOffsetWindowFunctions (see Spark OffsetWindowFunctions),
// we expect the first option name is `ignoreNulls` if ignoreNulls is true.
bool ignoreNulls = false;
if (!options.empty() && options.at(0).name() == "ignoreNulls") {
ignoreNulls = true;
}
for (const auto& arg : argumentList) {
windowParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
}
auto windowVeloxType = SubstraitParser::parseType(windowFunction.output_type());
auto windowCall = std::make_shared<const core::CallTypedExpr>(windowVeloxType, std::move(windowParams), funcName);
auto upperBound = windowFunction.upper_bound();
auto lowerBound = windowFunction.lower_bound();
auto type = windowFunction.window_type();
windowColumnNames.push_back(windowFunction.column_name());
windowNodeFunctions.push_back(
{std::move(windowCall), std::move(createWindowFrame(lowerBound, upperBound, type, inputType)), ignoreNulls});
}
// Construct partitionKeys
std::vector<core::FieldAccessTypedExprPtr> partitionKeys;
std::unordered_set<std::string> keyNames;
const auto& partitions = windowRel.partition_expressions();
partitionKeys.reserve(partitions.size());
for (const auto& partition : partitions) {
auto expression = exprConverter_->toVeloxExpr(partition, inputType);
core::FieldAccessTypedExprPtr veloxPartitionKey =
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression);
VELOX_USER_CHECK_NOT_NULL(veloxPartitionKey, "Window Operator only supports field partition key.");
// Constructs unique parition keys.
if (keyNames.insert(veloxPartitionKey->name()).second) {
partitionKeys.emplace_back(veloxPartitionKey);
}
}
std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
std::vector<core::SortOrder> sortingOrders;
const auto& [rawSortingKeys, rawSortingOrders] = processSortField(windowRel.sorts(), inputType);
for (vector_size_t i = 0; i < rawSortingKeys.size(); ++i) {
// Constructs unique sort keys and excludes keys overlapped with partition keys.
if (keyNames.insert(rawSortingKeys[i]->name()).second) {
sortingKeys.emplace_back(rawSortingKeys[i]);
sortingOrders.emplace_back(rawSortingOrders[i]);
}
}
if (windowRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(windowRel.advanced_extension(), "isStreaming=")) {
return std::make_shared<core::WindowNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
windowColumnNames,
windowNodeFunctions,
true /*inputsSorted*/,
childNode);
} else {
return std::make_shared<core::WindowNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
windowColumnNames,
windowNodeFunctions,
false /*inputsSorted*/,
childNode);
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(
const ::substrait::WindowGroupLimitRel& windowGroupLimitRel) {
core::PlanNodePtr childNode;
if (windowGroupLimitRel.has_input()) {
childNode = toVeloxPlan(windowGroupLimitRel.input());
} else {
VELOX_FAIL("Child Rel is expected in WindowGroupLimitRel.");
}
const auto& inputType = childNode->outputType();
// Construct partitionKeys
std::vector<core::FieldAccessTypedExprPtr> partitionKeys;
std::unordered_set<std::string> keyNames;
const auto& partitions = windowGroupLimitRel.partition_expressions();
partitionKeys.reserve(partitions.size());
for (const auto& partition : partitions) {
auto expression = exprConverter_->toVeloxExpr(partition, inputType);
core::FieldAccessTypedExprPtr veloxPartitionKey =
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression);
VELOX_USER_CHECK_NOT_NULL(veloxPartitionKey, "Window Group Limit Operator only supports field partition key.");
// Constructs unique partition keys.
if (keyNames.insert(veloxPartitionKey->name()).second) {
partitionKeys.emplace_back(veloxPartitionKey);
}
}
std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
std::vector<core::SortOrder> sortingOrders;
const auto& [rawSortingKeys, rawSortingOrders] = processSortField(windowGroupLimitRel.sorts(), inputType);
for (vector_size_t i = 0; i < rawSortingKeys.size(); ++i) {
// Constructs unique sort keys and excludes keys overlapped with partition keys.
if (keyNames.insert(rawSortingKeys[i]->name()).second) {
sortingKeys.emplace_back(rawSortingKeys[i]);
sortingOrders.emplace_back(rawSortingOrders[i]);
}
}
const std::optional<std::string> rowNumberColumnName = std::nullopt;
if (sortingKeys.empty()) {
// Handle if all sorting keys are also used as partition keys.
return std::make_shared<core::RowNumberNode>(
nextPlanNodeId(),
partitionKeys,
rowNumberColumnName,
static_cast<int32_t>(windowGroupLimitRel.limit()),
childNode);
}
return std::make_shared<core::TopNRowNumberNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
rowNumberColumnName,
static_cast<int32_t>(windowGroupLimitRel.limit()),
childNode);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SetRel& setRel) {
switch (setRel.op()) {
case ::substrait::SetRel_SetOp::SetRel_SetOp_SET_OP_UNION_ALL: {
std::vector<core::PlanNodePtr> children;
for (int32_t i = 0; i < setRel.inputs_size(); ++i) {
const auto& input = setRel.inputs(i);
children.push_back(toVeloxPlan(input));
}
GLUTEN_CHECK(!children.empty(), "At least one source is required for Velox LocalPartition");
// Velox doesn't allow different field names in schemas of LocalPartitionNode's children.
// Add project nodes to unify the schemas.
const RowTypePtr outRowType = asRowType(children[0]->outputType());
std::vector<std::string> outNames;
for (int32_t colIdx = 0; colIdx < outRowType->size(); ++colIdx) {
const auto name = outRowType->childAt(colIdx)->name();
outNames.push_back(name);
}
std::vector<core::PlanNodePtr> projectedChildren;
for (int32_t i = 0; i < children.size(); ++i) {
const auto& child = children[i];
const RowTypePtr& childRowType = child->outputType();
std::vector<core::TypedExprPtr> expressions;
for (int32_t colIdx = 0; colIdx < outNames.size(); ++colIdx) {
const auto fa =
std::make_shared<core::FieldAccessTypedExpr>(childRowType->childAt(colIdx), childRowType->nameOf(colIdx));
const auto cast = std::make_shared<core::CastTypedExpr>(outRowType->childAt(colIdx), fa, false);
expressions.push_back(cast);
}
auto project = std::make_shared<core::ProjectNode>(nextPlanNodeId(), outNames, expressions, child);
projectedChildren.push_back(project);
}
return std::make_shared<core::LocalPartitionNode>(
nextPlanNodeId(),
core::LocalPartitionNode::Type::kGather,
false,
std::make_shared<core::GatherPartitionFunctionSpec>(),
projectedChildren);
}
default:
throw GlutenException("Unsupported SetRel op: " + std::to_string(setRel.op()));
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SortRel& sortRel) {
auto childNode = convertSingleInput<::substrait::SortRel>(sortRel);
auto [sortingKeys, sortingOrders] = processSortField(sortRel.sorts(), childNode->outputType());
return std::make_shared<core::OrderByNode>(
nextPlanNodeId(), sortingKeys, sortingOrders, false /*isPartial*/, childNode);
}
std::pair<std::vector<core::FieldAccessTypedExprPtr>, std::vector<core::SortOrder>>
SubstraitToVeloxPlanConverter::processSortField(
const ::google::protobuf::RepeatedPtrField<::substrait::SortField>& sortFields,
const RowTypePtr& inputType) {
std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
std::vector<core::SortOrder> sortingOrders;
std::unordered_set<std::string> uniqueKeys;
for (const auto& sort : sortFields) {
GLUTEN_CHECK(sort.has_expr(), "Sort field must have expr");
auto expression = exprConverter_->toVeloxExpr(sort.expr(), inputType);
auto fieldExpr = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression);
VELOX_USER_CHECK_NOT_NULL(fieldExpr, "Sort Operator only supports field sorting key");
if (uniqueKeys.insert(fieldExpr->name()).second) {
sortingKeys.emplace_back(fieldExpr);
sortingOrders.emplace_back(toSortOrder(sort));
}
}
return {sortingKeys, sortingOrders};
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::FilterRel& filterRel) {
auto childNode = convertSingleInput<::substrait::FilterRel>(filterRel);
auto filterNode = std::make_shared<core::FilterNode>(
nextPlanNodeId(), exprConverter_->toVeloxExpr(filterRel.condition(), childNode->outputType()), childNode);
if (filterRel.has_common()) {
return processEmit(filterRel.common(), std::move(filterNode));
} else {
return filterNode;
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::FetchRel& fetchRel) {
auto childNode = convertSingleInput<::substrait::FetchRel>(fetchRel);
return std::make_shared<core::LimitNode>(
nextPlanNodeId(),
static_cast<int32_t>(fetchRel.offset()),
static_cast<int32_t>(fetchRel.count()),
false /*isPartial*/,
childNode);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::TopNRel& topNRel) {
auto childNode = convertSingleInput<::substrait::TopNRel>(topNRel);
auto [sortingKeys, sortingOrders] = processSortField(topNRel.sorts(), childNode->outputType());
return std::make_shared<core::TopNNode>(
nextPlanNodeId(), sortingKeys, sortingOrders, static_cast<int32_t>(topNRel.n()), false /*isPartial*/, childNode);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
const ::substrait::ReadRel& readRel,
int32_t streamIdx) {
// Get the input schema of this iterator.
uint64_t colNum = 0;
std::vector<TypePtr> veloxTypeList;
if (readRel.has_base_schema()) {
const auto& baseSchema = readRel.base_schema();
// Input names is not used. Instead, new input/output names will be created
// because the ValueStreamNode in Velox does not support name change.
colNum = baseSchema.names().size();
veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema);
}
std::vector<std::string> outNames;
outNames.reserve(colNum);
for (int idx = 0; idx < colNum; idx++) {
auto colName = SubstraitParser::makeNodeName(planNodeId_, idx);
outNames.emplace_back(colName);
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
std::shared_ptr<ResultIterator> iterator;
if (!validationMode_) {
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(iterator));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
const ::substrait::ReadRel& readRel,
int32_t streamIdx) {
std::vector<RowVectorPtr> values;
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
const auto iterator = inputIters_[streamIdx];
while (iterator->hasNext()) {
auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), iterator->next());
values.emplace_back(cb->getRowVector());
}
auto node = std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), std::move(values));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) {
// emit is not allowed in TableScanNode and ValuesNode related
// outputs
if (readRel.has_common()) {
VELOX_USER_CHECK(
!readRel.common().has_emit(), "Emit not supported for ValuesNode and TableScanNode related Substrait plans.");
}
// Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source.
auto streamIdx = getStreamIndex(readRel);
if (streamIdx >= 0) {
// Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization.
if (LIKELY(confMap_[kQueryTraceEnabled] != "true")) {
return constructValueStreamNode(readRel, streamIdx);
} else {
return constructValuesNode(readRel, streamIdx);
}
}
// Otherwise, will create TableScan node for ReadRel.
auto splitInfo = std::make_shared<SplitInfo>();
if (!validationMode_) {
VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel and related split info.");
splitInfo = splitInfos_[splitInfoIdx_++];
}
// Get output names and types.
std::vector<std::string> colNameList;
std::vector<TypePtr> veloxTypeList;
std::vector<ColumnType> columnTypes;
// Convert field names into lower case when not case-sensitive.
std::unique_ptr<facebook::velox::config::ConfigBase> veloxCfg =
std::make_unique<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
if (readRel.has_base_schema()) {
const auto& baseSchema = readRel.base_schema();
colNameList.reserve(baseSchema.names().size());
for (const auto& name : baseSchema.names()) {
std::string fieldName = name;
if (asLowerCase) {
folly::toLowerAscii(fieldName);
}
colNameList.emplace_back(fieldName);
}
veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase);
SubstraitParser::parseColumnTypes(baseSchema, columnTypes);
}
// Velox requires Filter Pushdown must being enabled.
bool filterPushdownEnabled = true;
auto names = colNameList;
auto types = veloxTypeList;
auto dataColumns = ROW(std::move(names), std::move(types));
std::shared_ptr<connector::hive::HiveTableHandle> tableHandle;
if (!readRel.has_filter()) {
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId, "hive_table", filterPushdownEnabled, common::SubfieldFilters{}, nullptr, dataColumns);
} else {
common::SubfieldFilters subfieldFilters;
auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), dataColumns);
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId,
"hive_table",
filterPushdownEnabled,
std::move(subfieldFilters),
remainingFilter,
dataColumns);
}
// Get assignments and out names.
std::vector<std::string> outNames;
outNames.reserve(colNameList.size());
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>> assignments;
for (int idx = 0; idx < colNameList.size(); idx++) {
auto outName = SubstraitParser::makeNodeName(planNodeId_, idx);
auto columnType = columnTypes[idx];
assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
colNameList[idx], columnType, veloxTypeList[idx], veloxTypeList[idx]);
outNames.emplace_back(outName);
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
if (readRel.has_virtual_table()) {
return toVeloxPlan(readRel, outputType);
} else {
auto tableScanNode = std::make_shared<core::TableScanNode>(
nextPlanNodeId(), std::move(outputType), std::move(tableHandle), std::move(assignments));
// Set split info map.
splitInfoMap_[tableScanNode->id()] = splitInfo;
return tableScanNode;
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& readRel,
const RowTypePtr& type) {
::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
int64_t numVectors = readVirtualTable.values_size();
int64_t numColumns = type->size();
int64_t valueFieldNums = readVirtualTable.values(numVectors - 1).fields_size();
std::vector<RowVectorPtr> vectors;
vectors.reserve(numVectors);
int64_t batchSize;
// For the empty vectors, eg,vectors = makeRowVector(ROW({}, {}), 1).
if (numColumns == 0) {
batchSize = 1;
} else {
batchSize = valueFieldNums / numColumns;
}
for (int64_t index = 0; index < numVectors; ++index) {
std::vector<VectorPtr> children;
::substrait::Expression_Literal_Struct rowValue = readRel.virtual_table().values(index);
auto fieldSize = rowValue.fields_size();
VELOX_CHECK_EQ(fieldSize, batchSize * numColumns);
for (int64_t col = 0; col < numColumns; ++col) {
const TypePtr& outputChildType = type->childAt(col);
std::vector<variant> batchChild;
batchChild.reserve(batchSize);
for (int64_t batchId = 0; batchId < batchSize; batchId++) {
// each value in the batch
auto fieldIdx = col * batchSize + batchId;
::substrait::Expression_Literal field = rowValue.fields(fieldIdx);
auto expr = exprConverter_->toVeloxExpr(field);
if (auto constantExpr = std::dynamic_pointer_cast<const core::ConstantTypedExpr>(expr)) {
if (!constantExpr->hasValueVector()) {
batchChild.emplace_back(constantExpr->value());
} else {
VELOX_UNSUPPORTED("Values node with complex type values is not supported yet");
}
} else {
VELOX_FAIL("Expected constant expression");
}
}
children.emplace_back(setVectorFromVariants(outputChildType, batchChild, pool_));
}
vectors.emplace_back(std::make_shared<RowVector>(pool_, type, nullptr, batchSize, children));
}
return std::make_shared<core::ValuesNode>(nextPlanNodeId(), std::move(vectors));
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::Rel& rel) {
if (rel.has_aggregate()) {
return toVeloxPlan(rel.aggregate());
} else if (rel.has_project()) {
return toVeloxPlan(rel.project());
} else if (rel.has_filter()) {
return toVeloxPlan(rel.filter());
} else if (rel.has_join()) {
return toVeloxPlan(rel.join());
} else if (rel.has_cross()) {
return toVeloxPlan(rel.cross());
} else if (rel.has_read()) {
return toVeloxPlan(rel.read());
} else if (rel.has_sort()) {
return toVeloxPlan(rel.sort());
} else if (rel.has_expand()) {
return toVeloxPlan(rel.expand());
} else if (rel.has_generate()) {
return toVeloxPlan(rel.generate());
} else if (rel.has_fetch()) {
return toVeloxPlan(rel.fetch());
} else if (rel.has_top_n()) {
return toVeloxPlan(rel.top_n());
} else if (rel.has_window()) {
return toVeloxPlan(rel.window());
} else if (rel.has_write()) {
return toVeloxPlan(rel.write());
} else if (rel.has_windowgrouplimit()) {
return toVeloxPlan(rel.windowgrouplimit());
} else if (rel.has_set()) {
return toVeloxPlan(rel.set());
} else {
VELOX_NYI("Substrait conversion not supported for Rel.");
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::RelRoot& root) {
// TODO: Use the names as the output names for the whole computing.
// const auto& names = root.names();
if (root.has_input()) {
const auto& rel = root.input();
return toVeloxPlan(rel);
} else {
VELOX_FAIL("Input is expected in RelRoot.");
}
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::Plan& substraitPlan) {
VELOX_CHECK(checkTypeExtension(substraitPlan), "The type extension only have unknown type.");
// Construct the function map based on the Substrait representation,
// and initialize the expression converter with it.
constructFunctionMap(substraitPlan);
// In fact, only one RelRoot or Rel is expected here.
VELOX_CHECK_EQ(substraitPlan.relations_size(), 1);
const auto& rel = substraitPlan.relations(0);
if (rel.has_root()) {
return toVeloxPlan(rel.root());
} else if (rel.has_rel()) {
return toVeloxPlan(rel.rel());
} else {
VELOX_FAIL("RelRoot or Rel is expected in Plan.");
}
}
std::string SubstraitToVeloxPlanConverter::nextPlanNodeId() {
auto id = fmt::format("{}", planNodeId_);
planNodeId_++;
return id;
}
void SubstraitToVeloxPlanConverter::constructFunctionMap(const ::substrait::Plan& substraitPlan) {
// Construct the function map based on the Substrait representation.
for (const auto& extension : substraitPlan.extensions()) {
if (!extension.has_extension_function()) {
continue;
}
const auto& sFmap = extension.extension_function();
auto id = sFmap.function_anchor();
auto name = sFmap.name();
functionMap_[id] = name;
}
exprConverter_ = std::make_unique<SubstraitVeloxExprConverter>(pool_, functionMap_);
}
std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) {
return SubstraitParser::findFunctionSpec(functionMap_, id);
}
int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) {
if (sRead.has_local_files()) {
const auto& fileList = sRead.local_files().items();
if (fileList.size() == 0) {
// bucketed scan may contains empty file list
return -1;
}
// The stream input will be specified with the format of
// "iterator:${index}".
std::string filePath = fileList[0].uri_file();
std::string prefix = "iterator:";
std::size_t pos = filePath.find(prefix);
if (pos == std::string::npos) {
return -1;
}
// Get the index.
std::string idxStr = filePath.substr(pos + prefix.size(), filePath.size());
try {
return stoi(idxStr);
} catch (const std::exception& err) {
VELOX_FAIL(err.what());
}
}
return -1;
}
void SubstraitToVeloxPlanConverter::extractJoinKeys(
const ::substrait::Expression& joinExpression,
std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
std::vector<const ::substrait::Expression::FieldReference*>& rightExprs) {
std::stack<const ::substrait::Expression*> expressions;
expressions.push(&joinExpression);
while (!expressions.empty()) {
auto visited = expressions.top();
expressions.pop();
if (visited->rex_type_case() == ::substrait::Expression::RexTypeCase::kScalarFunction) {
const auto& funcName = SubstraitParser::getNameBeforeDelimiter(
SubstraitParser::findVeloxFunction(functionMap_, visited->scalar_function().function_reference()));
const auto& args = visited->scalar_function().arguments();
if (funcName == "and") {
expressions.push(&args[1].value());
expressions.push(&args[0].value());
} else if (funcName == "eq" || funcName == "equalto" || funcName == "decimal_equalto") {
VELOX_CHECK(std::all_of(args.cbegin(), args.cend(), [](const ::substrait::FunctionArgument& arg) {
return arg.value().has_selection();
}));
leftExprs.push_back(&args[0].value().selection());
rightExprs.push_back(&args[1].value().selection());
} else {
VELOX_NYI("Join condition {} not supported.", funcName);
}
} else {
VELOX_FAIL("Unable to parse from join expression: {}", joinExpression.DebugString());
}
}
}
bool SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan& substraitPlan) {
for (const auto& sExtension : substraitPlan.extensions()) {
if (!sExtension.has_extension_type()) {
continue;
}
// Only support UNKNOWN type in UserDefined type extension.
if (sExtension.extension_type().name() != "UNKNOWN") {
return false;
}
}
return true;
}
} // namespace gluten