cpp/velox/substrait/SubstraitToVeloxPlan.h (119 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "SubstraitToVeloxExpr.h"
#include "TypeUtils.h"
#include "velox/connectors/hive/FileProperties.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/common/Options.h"
namespace gluten {
class ResultIterator;
struct SplitInfo {
/// Whether the split comes from arrow array stream node.
bool isStream = false;
/// The Partition index.
u_int32_t partitionIndex;
/// The partition columns associated with partitioned table.
std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
/// The metadata columns associated with partitioned table.
std::vector<std::unordered_map<std::string, std::string>> metadataColumns;
/// The file paths to be scanned.
std::vector<std::string> paths;
/// The file starts in the scan.
std::vector<u_int64_t> starts;
/// The lengths to be scanned.
std::vector<u_int64_t> lengths;
/// The file format of the files to be scanned.
dwio::common::FileFormat format;
/// The file sizes and modification times of the files to be scanned.
std::vector<std::optional<facebook::velox::FileProperties>> properties;
/// Make SplitInfo polymorphic
virtual ~SplitInfo() = default;
};
/// This class is used to convert the Substrait plan into Velox plan.
class SubstraitToVeloxPlanConverter {
public:
SubstraitToVeloxPlanConverter(
memory::MemoryPool* pool,
const std::unordered_map<std::string, std::string>& confMap = {},
const std::optional<std::string> writeFilesTempPath = std::nullopt,
bool validationMode = false)
: pool_(pool), confMap_(confMap), writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {}
/// Used to convert Substrait WriteRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel);
/// Used to convert Substrait ExpandRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::ExpandRel& expandRel);
/// Used to convert Substrait GenerateRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::GenerateRel& generateRel);
/// Used to convert Substrait WindowRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WindowRel& windowRel);
/// Used to convert Substrait WindowGroupLimitRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WindowGroupLimitRel& windowGroupLimitRel);
/// Used to convert Substrait SetRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::SetRel& setRel);
/// Used to convert Substrait JoinRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::JoinRel& joinRel);
/// Used to convert Substrait CrossRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::CrossRel& crossRel);
/// Used to convert Substrait AggregateRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::AggregateRel& aggRel);
/// Convert Substrait ProjectRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::ProjectRel& projectRel);
/// Convert Substrait FilterRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::FilterRel& filterRel);
/// Convert Substrait FetchRel into Velox LimitNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::FetchRel& fetchRel);
/// Convert Substrait TopNRel into Velox TopNNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::TopNRel& topNRel);
/// Convert Substrait ReadRel into Velox Values Node.
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& readRel, const RowTypePtr& type);
/// Convert Substrait SortRel into Velox OrderByNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::SortRel& sortRel);
/// Convert Substrait ReadRel into Velox PlanNode.
/// Index: the index of the partition this item belongs to.
/// Starts: the start positions in byte to read from the items.
/// Lengths: the lengths in byte to read from the items.
/// FileProperties: the file sizes and modification times of the files to be scanned.
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);
// This is only used in benchmark and enable query trace, which will load all the data to ValuesNode.
core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);
/// Used to convert Substrait Rel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel);
/// Used to convert Substrait RelRoot into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::RelRoot& sRoot);
/// Used to convert Substrait Plan into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::Plan& substraitPlan);
// return the raw ptr of ExprConverter
SubstraitVeloxExprConverter* getExprConverter() {
return exprConverter_.get();
}
/// Used to construct the function map between the index
/// and the Substrait function name. Initialize the expression
/// converter based on the constructed function map.
void constructFunctionMap(const ::substrait::Plan& substraitPlan);
/// Will return the function map used by this plan converter.
const std::unordered_map<uint64_t, std::string>& getFunctionMap() const {
return functionMap_;
}
/// Return the splitInfo map used by this plan converter.
const std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfos() const {
return splitInfoMap_;
}
/// Used to insert certain plan node as input. The plan node
/// id will start from the setted one.
void insertInputNode(uint64_t inputIdx, const std::shared_ptr<const core::PlanNode>& inputNode, int planNodeId) {
inputNodesMap_[inputIdx] = inputNode;
planNodeId_ = planNodeId;
}
void setSplitInfos(std::vector<std::shared_ptr<SplitInfo>> splitInfos) {
splitInfos_ = splitInfos;
}
void setValueStreamNodeFactory(
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> factory) {
valueStreamNodeFactory_ = std::move(factory);
}
void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
inputIters_ = std::move(inputIters);
}
/// Used to check if ReadRel specifies an input of stream.
/// If yes, the index of input stream will be returned.
/// If not, -1 will be returned.
int32_t getStreamIndex(const ::substrait::ReadRel& sRel);
/// Used to find the function specification in the constructed function map.
std::string findFuncSpec(uint64_t id);
/// Extract join keys from joinExpression.
/// joinExpression is a boolean condition that describes whether each record
/// from the left set “match” the record from the right set. The condition
/// must only include the following operations: AND, ==, field references.
/// Field references correspond to the direct output order of the data.
void extractJoinKeys(
const ::substrait::Expression& joinExpression,
std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
std::vector<const ::substrait::Expression::FieldReference*>& rightExprs);
/// Get aggregation step from AggregateRel.
/// If returned Partial, it means the aggregate generated can leveraging flushing and abandoning like
/// what streaming pre-aggregation can do in MPP databases.
core::AggregationNode::Step toAggregationStep(const ::substrait::AggregateRel& sAgg);
/// Get aggregation function step for AggregateFunction.
/// The returned step value will be used to decide which Velox aggregate function or companion function
/// is used for the actual data processing.
core::AggregationNode::Step toAggregationFunctionStep(const ::substrait::AggregateFunction& sAggFuc);
/// We use companion functions if the aggregate is not single.
std::string toAggregationFunctionName(const std::string& baseName, const core::AggregationNode::Step& step);
/// Helper Function to convert Substrait sortField to Velox sortingKeys and
/// sortingOrders.
/// Note that, this method would deduplicate the sorting keys which have the same field name.
std::pair<std::vector<core::FieldAccessTypedExprPtr>, std::vector<core::SortOrder>> processSortField(
const ::google::protobuf::RepeatedPtrField<::substrait::SortField>& sortField,
const RowTypePtr& inputType);
private:
/// Integrate Substrait emit feature. Here a given 'substrait::RelCommon'
/// is passed and check if emit is defined for this relation. Basically a
/// ProjectNode is added on top of 'noEmitNode' to represent output order
/// specified in 'relCommon::emit'. Return 'noEmitNode' as is
/// if output order is 'kDriect'.
core::PlanNodePtr processEmit(const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& noEmitNode);
/// Check the Substrait type extension only has one unknown extension.
static bool checkTypeExtension(const ::substrait::Plan& substraitPlan);
/// Returns unique ID to use for plan node. Produces sequential numbers
/// starting from zero.
std::string nextPlanNodeId();
/// Used to convert AggregateRel into Velox plan node.
/// The output of child node will be used as the input of Aggregation.
std::shared_ptr<const core::PlanNode> toVeloxAgg(
const ::substrait::AggregateRel& sAgg,
const std::shared_ptr<const core::PlanNode>& childNode,
const core::AggregationNode::Step& aggStep);
/// Helper function to convert the input of Substrait Rel to Velox Node.
template <typename T>
core::PlanNodePtr convertSingleInput(T rel) {
VELOX_CHECK(rel.has_input(), "Child Rel is expected here.");
return toVeloxPlan(rel.input());
}
const core::WindowNode::Frame createWindowFrame(
const ::substrait::Expression_WindowFunction_Bound& lower_bound,
const ::substrait::Expression_WindowFunction_Bound& upper_bound,
const ::substrait::WindowType& type,
const RowTypePtr& inputType);
/// The unique identification for each PlanNode.
int planNodeId_ = 0;
/// The map storing the relations between the function id and the function
/// name. Will be constructed based on the Substrait representation.
std::unordered_map<uint64_t, std::string> functionMap_;
/// The map storing the split stats for each PlanNode.
std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>> splitInfoMap_;
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> valueStreamNodeFactory_;
std::vector<std::shared_ptr<ResultIterator>> inputIters_;
/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
std::unordered_map<uint64_t, std::shared_ptr<const core::PlanNode>> inputNodesMap_;
int32_t splitInfoIdx_{0};
std::vector<std::shared_ptr<SplitInfo>> splitInfos_;
/// The Expression converter used to convert Substrait representations into
/// Velox expressions.
std::unique_ptr<SubstraitVeloxExprConverter> exprConverter_;
/// Memory pool.
memory::MemoryPool* pool_;
/// A map of custom configs.
std::unordered_map<std::string, std::string> confMap_;
/// The temporary path used to write files.
std::optional<std::string> writeFilesTempPath_;
/// A flag used to specify validation.
bool validationMode_ = false;
};
} // namespace gluten