/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Type.h"

#include "utils/ConfigExtractor.h"

#include "config/GlutenConfig.h"

namespace gluten {
namespace {

core::SortOrder toSortOrder(const ::substrait::SortField& sortField) {
  switch (sortField.direction()) {
    case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
      return core::kAscNullsFirst;
    case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
      return core::kAscNullsLast;
    case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
      return core::kDescNullsFirst;
    case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
      return core::kDescNullsLast;
    default:
      VELOX_FAIL("Sort direction is not supported.");
  }
}

/// Holds the information required to create
/// a project node to simulate the emit
/// behavior in Substrait.
struct EmitInfo {
  std::vector<core::TypedExprPtr> expressions;
  std::vector<std::string> projectNames;
};

/// Helper function to extract the attributes required to create a ProjectNode
/// used for interpretting Substrait Emit.
EmitInfo getEmitInfo(const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& node) {
  const auto& emit = relCommon.emit();
  int emitSize = emit.output_mapping_size();
  EmitInfo emitInfo;
  emitInfo.projectNames.reserve(emitSize);
  emitInfo.expressions.reserve(emitSize);
  const auto& outputType = node->outputType();
  for (int i = 0; i < emitSize; i++) {
    int32_t mapId = emit.output_mapping(i);
    emitInfo.projectNames[i] = outputType->nameOf(mapId);
    emitInfo.expressions[i] =
        std::make_shared<core::FieldAccessTypedExpr>(outputType->childAt(mapId), outputType->nameOf(mapId));
  }
  return emitInfo;
}

template <typename T>
// Get the lowest value for numeric type.
T getLowest() {
  return std::numeric_limits<T>::lowest();
}

// Get the lowest value for string.
template <>
std::string getLowest<std::string>() {
  return "";
}

// Get the max value for numeric type.
template <typename T>
T getMax() {
  return std::numeric_limits<T>::max();
}

// The max value will be used in BytesRange. Return empty string here instead.
template <>
std::string getMax<std::string>() {
  return "";
}

// Substrait function names.
const std::string sIsNotNull = "is_not_null";
const std::string sIsNull = "is_null";
const std::string sGte = "gte";
const std::string sGt = "gt";
const std::string sLte = "lte";
const std::string sLt = "lt";
const std::string sEqual = "equal";
const std::string sOr = "or";
const std::string sNot = "not";

// Substrait types.
const std::string sI32 = "i32";
const std::string sI64 = "i64";

/// @brief Get the input type from both sides of join.
/// @param leftNode the plan node of left side.
/// @param rightNode the plan node of right side.
/// @return the input type.
RowTypePtr getJoinInputType(const core::PlanNodePtr& leftNode, const core::PlanNodePtr& rightNode) {
  auto outputSize = leftNode->outputType()->size() + rightNode->outputType()->size();
  std::vector<std::string> outputNames;
  std::vector<std::shared_ptr<const Type>> outputTypes;
  outputNames.reserve(outputSize);
  outputTypes.reserve(outputSize);
  for (const auto& node : {leftNode, rightNode}) {
    const auto& names = node->outputType()->names();
    outputNames.insert(outputNames.end(), names.begin(), names.end());
    const auto& types = node->outputType()->children();
    outputTypes.insert(outputTypes.end(), types.begin(), types.end());
  }
  return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
}

/// @brief Get the direct output type of join.
/// @param leftNode the plan node of left side.
/// @param rightNode the plan node of right side.
/// @param joinType the join type.
/// @return the output type.
RowTypePtr getJoinOutputType(
    const core::PlanNodePtr& leftNode,
    const core::PlanNodePtr& rightNode,
    const core::JoinType& joinType) {
  // Decide output type.
  // Output of right semi join cannot include columns from the left side.
  bool outputMayIncludeLeftColumns = !(core::isRightSemiFilterJoin(joinType) || core::isRightSemiProjectJoin(joinType));

  // Output of left semi and anti joins cannot include columns from the right
  // side.
  bool outputMayIncludeRightColumns =
      !(core::isLeftSemiFilterJoin(joinType) || core::isLeftSemiProjectJoin(joinType) || core::isAntiJoin(joinType));

  if (outputMayIncludeLeftColumns && outputMayIncludeRightColumns) {
    return getJoinInputType(leftNode, rightNode);
  }

  if (outputMayIncludeLeftColumns) {
    if (core::isLeftSemiProjectJoin(joinType)) {
      std::vector<std::string> outputNames = leftNode->outputType()->names();
      std::vector<std::shared_ptr<const Type>> outputTypes = leftNode->outputType()->children();
      outputNames.emplace_back("exists");
      outputTypes.emplace_back(BOOLEAN());
      return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
    } else {
      return leftNode->outputType();
    }
  }

  if (outputMayIncludeRightColumns) {
    if (core::isRightSemiProjectJoin(joinType)) {
      std::vector<std::string> outputNames = rightNode->outputType()->names();
      std::vector<std::shared_ptr<const Type>> outputTypes = rightNode->outputType()->children();
      outputNames.emplace_back("exists");
      outputTypes.emplace_back(BOOLEAN());
      return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
    } else {
      return rightNode->outputType();
    }
  }
  VELOX_FAIL("Output should include left or right columns.");
}
} // namespace

core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit(
    const ::substrait::RelCommon& relCommon,
    const core::PlanNodePtr& noEmitNode) {
  switch (relCommon.emit_kind_case()) {
    case ::substrait::RelCommon::EmitKindCase::kDirect:
      return noEmitNode;
    case ::substrait::RelCommon::EmitKindCase::kEmit: {
      auto emitInfo = getEmitInfo(relCommon, noEmitNode);
      return std::make_shared<core::ProjectNode>(
          nextPlanNodeId(), std::move(emitInfo.projectNames), std::move(emitInfo.expressions), noEmitNode);
    }
    default:
      VELOX_FAIL("unrecognized emit kind");
  }
}

core::AggregationNode::Step SubstraitToVeloxPlanConverter::toAggregationStep(const ::substrait::AggregateRel& aggRel) {
  // TODO Simplify Velox's aggregation steps
  if (aggRel.has_advanced_extension() &&
      SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "allowFlush=")) {
    return core::AggregationNode::Step::kPartial;
  }
  return core::AggregationNode::Step::kSingle;
}

/// Get aggregation function step for AggregateFunction.
/// The returned step value will be used to decide which Velox aggregate function or companion function
/// is used for the actual data processing.
core::AggregationNode::Step SubstraitToVeloxPlanConverter::toAggregationFunctionStep(
    const ::substrait::AggregateFunction& sAggFuc) {
  const auto& phase = sAggFuc.phase();
  switch (phase) {
    case ::substrait::AGGREGATION_PHASE_UNSPECIFIED:
      VELOX_FAIL("Aggregation phase not specified.")
      break;
    case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE:
      return core::AggregationNode::Step::kPartial;
    case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE:
      return core::AggregationNode::Step::kIntermediate;
    case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT:
      return core::AggregationNode::Step::kSingle;
    case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT:
      return core::AggregationNode::Step::kFinal;
    default:
      VELOX_FAIL("Unexpected aggregation phase.")
  }
}

std::string SubstraitToVeloxPlanConverter::toAggregationFunctionName(
    const std::string& baseName,
    const core::AggregationNode::Step& step) {
  std::string suffix;
  switch (step) {
    case core::AggregationNode::Step::kPartial:
      suffix = "_partial";
      break;
    case core::AggregationNode::Step::kFinal:
      suffix = "_merge_extract";
      break;
    case core::AggregationNode::Step::kIntermediate:
      suffix = "_merge";
      break;
    case core::AggregationNode::Step::kSingle:
      suffix = "";
      break;
    default:
      VELOX_FAIL("Unexpected aggregation node step.")
  }
  return baseName + suffix;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::JoinRel& sJoin) {
  if (!sJoin.has_left()) {
    VELOX_FAIL("Left Rel is expected in JoinRel.");
  }
  if (!sJoin.has_right()) {
    VELOX_FAIL("Right Rel is expected in JoinRel.");
  }

  auto leftNode = toVeloxPlan(sJoin.left());
  auto rightNode = toVeloxPlan(sJoin.right());

  // Map join type.
  core::JoinType joinType;
  bool isNullAwareAntiJoin = false;
  switch (sJoin.type()) {
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_INNER:
      joinType = core::JoinType::kInner;
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_OUTER:
      joinType = core::JoinType::kFull;
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT:
      joinType = core::JoinType::kLeft;
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT:
      joinType = core::JoinType::kRight;
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
      // Determine the semi join type based on extracted information.
      if (sJoin.has_advanced_extension() &&
          SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isExistenceJoin=")) {
        joinType = core::JoinType::kLeftSemiProject;
      } else {
        joinType = core::JoinType::kLeftSemiFilter;
      }
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
      // Determine the semi join type based on extracted information.
      if (sJoin.has_advanced_extension() &&
          SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isExistenceJoin=")) {
        joinType = core::JoinType::kRightSemiProject;
      } else {
        joinType = core::JoinType::kRightSemiFilter;
      }
      break;
    case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_ANTI: {
      // Determine the anti join type based on extracted information.
      if (sJoin.has_advanced_extension() &&
          SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isNullAwareAntiJoin=")) {
        isNullAwareAntiJoin = true;
      }
      joinType = core::JoinType::kAnti;
      break;
    }
    default:
      VELOX_NYI("Unsupported Join type: {}", std::to_string(sJoin.type()));
  }

  // extract join keys from join expression
  std::vector<const ::substrait::Expression::FieldReference*> leftExprs, rightExprs;
  extractJoinKeys(sJoin.expression(), leftExprs, rightExprs);
  VELOX_CHECK_EQ(leftExprs.size(), rightExprs.size());
  size_t numKeys = leftExprs.size();

  std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>> leftKeys, rightKeys;
  leftKeys.reserve(numKeys);
  rightKeys.reserve(numKeys);
  auto inputRowType = getJoinInputType(leftNode, rightNode);
  for (size_t i = 0; i < numKeys; ++i) {
    leftKeys.emplace_back(exprConverter_->toVeloxExpr(*leftExprs[i], inputRowType));
    rightKeys.emplace_back(exprConverter_->toVeloxExpr(*rightExprs[i], inputRowType));
  }

  core::TypedExprPtr filter;
  if (sJoin.has_post_join_filter()) {
    filter = exprConverter_->toVeloxExpr(sJoin.post_join_filter(), inputRowType);
  }

  if (sJoin.has_advanced_extension() &&
      SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isSMJ=")) {
    // Create MergeJoinNode node
    return std::make_shared<core::MergeJoinNode>(
        nextPlanNodeId(),
        joinType,
        leftKeys,
        rightKeys,
        filter,
        leftNode,
        rightNode,
        getJoinOutputType(leftNode, rightNode, joinType));

  } else {
    // Create HashJoinNode node
    return std::make_shared<core::HashJoinNode>(
        nextPlanNodeId(),
        joinType,
        isNullAwareAntiJoin,
        leftKeys,
        rightKeys,
        filter,
        leftNode,
        rightNode,
        getJoinOutputType(leftNode, rightNode, joinType));
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::CrossRel& crossRel) {
  // Support basic cross join without any filters
  if (!crossRel.has_left()) {
    VELOX_FAIL("Left Rel is expected in CrossRel.");
  }
  if (!crossRel.has_right()) {
    VELOX_FAIL("Right Rel is expected in CrossRel.");
  }

  auto leftNode = toVeloxPlan(crossRel.left());
  auto rightNode = toVeloxPlan(crossRel.right());

  auto inputRowType = getJoinInputType(leftNode, rightNode);
  core::TypedExprPtr joinConditions;
  if (crossRel.has_expression()) {
    joinConditions = exprConverter_->toVeloxExpr(crossRel.expression(), inputRowType);
  }

  core::JoinType joinType = core::JoinType::kInner;

  return std::make_shared<core::NestedLoopJoinNode>(
      nextPlanNodeId(),
      joinType,
      joinConditions,
      leftNode,
      rightNode,
      getJoinOutputType(leftNode, rightNode, joinType));
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::AggregateRel& aggRel) {
  auto childNode = convertSingleInput<::substrait::AggregateRel>(aggRel);
  core::AggregationNode::Step aggStep = toAggregationStep(aggRel);
  const auto& inputType = childNode->outputType();
  std::vector<core::FieldAccessTypedExprPtr> veloxGroupingExprs;

  // Get the grouping expressions.
  for (const auto& grouping : aggRel.groupings()) {
    for (const auto& groupingExpr : grouping.grouping_expressions()) {
      // Velox's groupings are limited to be Field.
      veloxGroupingExprs.emplace_back(exprConverter_->toVeloxExpr(groupingExpr.selection(), inputType));
    }
  }

  // Parse measures and get the aggregate expressions.
  // Each measure represents one aggregate expression.
  std::vector<core::AggregationNode::Aggregate> aggregates;
  aggregates.reserve(aggRel.measures().size());

  for (const auto& measure : aggRel.measures()) {
    core::FieldAccessTypedExprPtr mask;
    ::substrait::Expression substraitAggMask = measure.filter();
    // Get Aggregation Masks.
    if (measure.has_filter()) {
      if (substraitAggMask.ByteSizeLong() > 0) {
        mask = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
            exprConverter_->toVeloxExpr(substraitAggMask, inputType));
      }
    }
    const auto& aggFunction = measure.measure();
    auto baseFuncName = SubstraitParser::findVeloxFunction(functionMap_, aggFunction.function_reference());
    auto funcName = toAggregationFunctionName(baseFuncName, toAggregationFunctionStep(aggFunction));
    std::vector<core::TypedExprPtr> aggParams;
    aggParams.reserve(aggFunction.arguments().size());
    for (const auto& arg : aggFunction.arguments()) {
      aggParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
    }
    auto aggVeloxType = SubstraitParser::parseType(aggFunction.output_type());
    auto aggExpr = std::make_shared<const core::CallTypedExpr>(aggVeloxType, std::move(aggParams), funcName);

    std::vector<TypePtr> rawInputTypes =
        SubstraitParser::sigToTypes(SubstraitParser::findFunctionSpec(functionMap_, aggFunction.function_reference()));
    aggregates.emplace_back(core::AggregationNode::Aggregate{aggExpr, rawInputTypes, mask, {}, {}});
  }

  bool ignoreNullKeys = false;
  std::vector<core::FieldAccessTypedExprPtr> preGroupingExprs;
  if (aggRel.has_advanced_extension() &&
      SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "isStreaming=")) {
    preGroupingExprs.reserve(veloxGroupingExprs.size());
    preGroupingExprs.insert(preGroupingExprs.begin(), veloxGroupingExprs.begin(), veloxGroupingExprs.end());
  }

  // Get the output names of Aggregation.
  std::vector<std::string> aggOutNames;
  aggOutNames.reserve(aggRel.measures().size());
  for (int idx = veloxGroupingExprs.size(); idx < veloxGroupingExprs.size() + aggRel.measures().size(); idx++) {
    aggOutNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, idx));
  }

  auto aggregationNode = std::make_shared<core::AggregationNode>(
      nextPlanNodeId(),
      aggStep,
      veloxGroupingExprs,
      preGroupingExprs,
      aggOutNames,
      aggregates,
      ignoreNullKeys,
      childNode);

  if (aggRel.has_common()) {
    return processEmit(aggRel.common(), std::move(aggregationNode));
  } else {
    return aggregationNode;
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ProjectRel& projectRel) {
  auto childNode = convertSingleInput<::substrait::ProjectRel>(projectRel);
  // Construct Velox Expressions.
  const auto& projectExprs = projectRel.expressions();
  std::vector<std::string> projectNames;
  std::vector<core::TypedExprPtr> expressions;
  projectNames.reserve(projectExprs.size());
  expressions.reserve(projectExprs.size());

  const auto& inputType = childNode->outputType();
  int colIdx = 0;
  // Note that Substrait projection adds the project expressions on top of the
  // input to the projection node. Thus we need to add the input columns first
  // and then add the projection expressions.

  // First, adding the project names and expressions from the input to
  // the project node.
  for (uint32_t idx = 0; idx < inputType->size(); idx++) {
    const auto& fieldName = inputType->nameOf(idx);
    projectNames.emplace_back(fieldName);
    expressions.emplace_back(std::make_shared<core::FieldAccessTypedExpr>(inputType->childAt(idx), fieldName));
    colIdx += 1;
  }

  // Then, adding project expression related project names and expressions.
  for (const auto& expr : projectExprs) {
    expressions.emplace_back(exprConverter_->toVeloxExpr(expr, inputType));
    projectNames.emplace_back(SubstraitParser::makeNodeName(planNodeId_, colIdx));
    colIdx += 1;
  }

  if (projectRel.has_common()) {
    auto relCommon = projectRel.common();
    const auto& emit = relCommon.emit();
    int emitSize = emit.output_mapping_size();
    std::vector<std::string> emitProjectNames(emitSize);
    std::vector<core::TypedExprPtr> emitExpressions(emitSize);
    for (int i = 0; i < emitSize; i++) {
      int32_t mapId = emit.output_mapping(i);
      emitProjectNames[i] = projectNames[mapId];
      emitExpressions[i] = expressions[mapId];
    }
    return std::make_shared<core::ProjectNode>(
        nextPlanNodeId(), std::move(emitProjectNames), std::move(emitExpressions), std::move(childNode));
  } else {
    return std::make_shared<core::ProjectNode>(
        nextPlanNodeId(), std::move(projectNames), std::move(expressions), std::move(childNode));
  }
}

std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
    const std::string& targetDirectory,
    const std::optional<std::string>& writeDirectory = std::nullopt,
    const connector::hive::LocationHandle::TableType& tableType =
        connector::hive::LocationHandle::TableType::kExisting) {
  return std::make_shared<connector::hive::LocationHandle>(
      targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
}

std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandle(
    const std::vector<std::string>& tableColumnNames,
    const std::vector<TypePtr>& tableColumnTypes,
    const std::vector<std::string>& partitionedBy,
    const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
    const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
    const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET,
    const std::optional<common::CompressionKind>& compressionKind = {}) {
  std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> columnHandles;
  columnHandles.reserve(tableColumnNames.size());
  std::vector<std::string> bucketedBy;
  std::vector<TypePtr> bucketedTypes;
  std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>> sortedBy;
  if (bucketProperty != nullptr) {
    bucketedBy = bucketProperty->bucketedBy();
    bucketedTypes = bucketProperty->bucketedTypes();
    sortedBy = bucketProperty->sortedBy();
  }
  int32_t numPartitionColumns{0};
  int32_t numSortingColumns{0};
  int32_t numBucketColumns{0};
  for (int i = 0; i < tableColumnNames.size(); ++i) {
    for (int j = 0; j < bucketedBy.size(); ++j) {
      if (bucketedBy[j] == tableColumnNames[i]) {
        ++numBucketColumns;
      }
    }
    for (int j = 0; j < sortedBy.size(); ++j) {
      if (sortedBy[j]->sortColumn() == tableColumnNames[i]) {
        ++numSortingColumns;
      }
    }
    if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
      ++numPartitionColumns;
      columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
          tableColumnNames.at(i),
          connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
          tableColumnTypes.at(i),
          tableColumnTypes.at(i)));
    } else {
      columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
          tableColumnNames.at(i),
          connector::hive::HiveColumnHandle::ColumnType::kRegular,
          tableColumnTypes.at(i),
          tableColumnTypes.at(i)));
    }
  }
  VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size());
  VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
  VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
  return std::make_shared<connector::hive::HiveInsertTableHandle>(
      columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) {
  core::PlanNodePtr childNode;
  if (writeRel.has_input()) {
    childNode = toVeloxPlan(writeRel.input());
  } else {
    VELOX_FAIL("Child Rel is expected in WriteRel.");
  }
  const auto& inputType = childNode->outputType();

  std::vector<std::string> tableColumnNames;
  std::vector<std::string> partitionedKey;
  std::vector<bool> isPartitionColumns;
  tableColumnNames.reserve(writeRel.table_schema().names_size());

  VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table schema to store the column information");
  const auto& tableSchema = writeRel.table_schema();
  isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema);

  for (const auto& name : tableSchema.names()) {
    tableColumnNames.emplace_back(name);
  }

  for (int i = 0; i < tableSchema.names_size(); i++) {
    if (isPartitionColumns[i]) {
      partitionedKey.emplace_back(tableColumnNames[i]);
    }
  }

  std::string writePath;
  if (writeFilesTempPath_.has_value()) {
    writePath = writeFilesTempPath_.value();
  } else {
    VELOX_CHECK(validationMode_, "WriteRel should have the write path before initializing the plan.");
    writePath = "";
  }

  // spark default compression code is snappy.
  common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
  if (writeRel.named_table().has_advanced_extension()) {
    if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) {
      compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
    } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) {
      compressionCodec = common::CompressionKind::CompressionKind_GZIP;
    } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLzo=")) {
      compressionCodec = common::CompressionKind::CompressionKind_LZO;
    } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLz4=")) {
      compressionCodec = common::CompressionKind::CompressionKind_LZ4;
    } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isZstd=")) {
      compressionCodec = common::CompressionKind::CompressionKind_ZSTD;
    } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isNone=")) {
      compressionCodec = common::CompressionKind::CompressionKind_NONE;
    } else if (SubstraitParser::configSetInOptimization(
                   writeRel.named_table().advanced_extension(), "isUncompressed=")) {
      compressionCodec = common::CompressionKind::CompressionKind_NONE;
    }
  }

  // Do not hard-code connector ID and allow for connectors other than Hive.
  static const std::string kHiveConnectorId = "test-hive";

  return std::make_shared<core::TableWriteNode>(
      nextPlanNodeId(),
      inputType,
      tableColumnNames,
      nullptr, /*aggregationNode*/
      std::make_shared<core::InsertTableHandle>(
          kHiveConnectorId,
          makeHiveInsertTableHandle(
              tableColumnNames, /*inputType->names() clolumn name is different*/
              inputType->children(),
              partitionedKey,
              nullptr /*bucketProperty*/,
              makeLocationHandle(writePath),
              dwio::common::FileFormat::PARQUET, // Currently only support parquet format.
              compressionCodec)),
      (partitionedKey.size() > 0) ? true : false,
      exec::TableWriteTraits::outputType(nullptr),
      connector::CommitStrategy::kNoCommit,
      childNode);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ExpandRel& expandRel) {
  core::PlanNodePtr childNode;
  if (expandRel.has_input()) {
    childNode = toVeloxPlan(expandRel.input());
  } else {
    VELOX_FAIL("Child Rel is expected in ExpandRel.");
  }

  const auto& inputType = childNode->outputType();

  std::vector<std::vector<core::TypedExprPtr>> projectSetExprs;
  projectSetExprs.reserve(expandRel.fields_size());

  for (const auto& projections : expandRel.fields()) {
    std::vector<core::TypedExprPtr> projectExprs;
    projectExprs.reserve(projections.switching_field().duplicates_size());

    for (const auto& projectExpr : projections.switching_field().duplicates()) {
      if (projectExpr.has_selection()) {
        auto expression = exprConverter_->toVeloxExpr(projectExpr.selection(), inputType);
        projectExprs.emplace_back(expression);
      } else if (projectExpr.has_literal()) {
        auto expression = exprConverter_->toVeloxExpr(projectExpr.literal());
        projectExprs.emplace_back(expression);
      } else {
        VELOX_FAIL("The project in Expand Operator only support field or literal.");
      }
    }
    projectSetExprs.emplace_back(projectExprs);
  }

  auto projectSize = expandRel.fields()[0].switching_field().duplicates_size();
  std::vector<std::string> names;
  names.reserve(projectSize);
  for (int idx = 0; idx < projectSize; idx++) {
    names.push_back(SubstraitParser::makeNodeName(planNodeId_, idx));
  }

  return std::make_shared<core::ExpandNode>(nextPlanNodeId(), projectSetExprs, std::move(names), childNode);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::GenerateRel& generateRel) {
  core::PlanNodePtr childNode;
  if (generateRel.has_input()) {
    childNode = toVeloxPlan(generateRel.input());
  } else {
    VELOX_FAIL("Child Rel is expected in GenerateRel.");
  }
  const auto& inputType = childNode->outputType();

  std::vector<core::FieldAccessTypedExprPtr> replicated;
  std::vector<core::FieldAccessTypedExprPtr> unnest;

  const auto& generator = generateRel.generator();
  const auto& requiredChildOutput = generateRel.child_output();

  replicated.reserve(requiredChildOutput.size());
  for (const auto& output : requiredChildOutput) {
    auto expression = exprConverter_->toVeloxExpr(output, inputType);
    auto expr_field = dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
    VELOX_CHECK(expr_field != nullptr, " the output in Generate Operator only support field")

    replicated.emplace_back(std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression));
  }

  auto projNode = std::dynamic_pointer_cast<const core::ProjectNode>(childNode);

  if (projNode != nullptr && projNode->names().size() > requiredChildOutput.size()) {
    // generator is a scalarfunction node -> explode(array(col, 'all'))
    // use the last one, this is ensure by scala code
    auto innerName = projNode->names().back();
    auto innerExpr = projNode->projections().back();

    auto innerType = innerExpr->type();
    auto unnestFieldExpr = std::make_shared<core::FieldAccessTypedExpr>(innerType, innerName);
    VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only support field");
    unnest.emplace_back(unnestFieldExpr);
  } else {
    // generator should be a array column -> explode(col)
    auto explodeFunc = generator.scalar_function();
    auto unnestExpr = exprConverter_->toVeloxExpr(explodeFunc.arguments(0).value(), inputType);
    auto unnestFieldExpr = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(unnestExpr);
    VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only support field");
    unnest.emplace_back(unnestFieldExpr);
  }

  // TODO(yuan): get from generator output
  std::vector<std::string> unnestNames;
  int unnestIndex = 0;
  for (const auto& variable : unnest) {
    if (variable->type()->isArray()) {
      unnestNames.emplace_back(fmt::format("C{}", unnestIndex++));
    } else if (variable->type()->isMap()) {
      unnestNames.emplace_back(fmt::format("C{}", unnestIndex++));
      unnestNames.emplace_back(fmt::format("C{}", unnestIndex++));
    } else {
      VELOX_FAIL(
          "Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.", variable->type()->toString());
    }
  }

  auto node = std::make_shared<core::UnnestNode>(
      nextPlanNodeId(), replicated, unnest, std::move(unnestNames), std::nullopt, childNode);

  return node;
}

const core::WindowNode::Frame createWindowFrame(
    const ::substrait::Expression_WindowFunction_Bound& lower_bound,
    const ::substrait::Expression_WindowFunction_Bound& upper_bound,
    const ::substrait::WindowType& type) {
  core::WindowNode::Frame frame;
  switch (type) {
    case ::substrait::WindowType::ROWS:
      frame.type = core::WindowNode::WindowType::kRows;
      break;
    case ::substrait::WindowType::RANGE:
      frame.type = core::WindowNode::WindowType::kRange;
      break;
    default:
      VELOX_FAIL("the window type only support ROWS and RANGE, and the input type is ", std::to_string(type));
  }

  auto boundTypeConversion = [](::substrait::Expression_WindowFunction_Bound boundType) -> core::WindowNode::BoundType {
    if (boundType.has_current_row()) {
      return core::WindowNode::BoundType::kCurrentRow;
    } else if (boundType.has_unbounded_following()) {
      return core::WindowNode::BoundType::kUnboundedFollowing;
    } else if (boundType.has_unbounded_preceding()) {
      return core::WindowNode::BoundType::kUnboundedPreceding;
    } else if (boundType.has_following()) {
      return core::WindowNode::BoundType::kFollowing;
    } else if (boundType.has_preceding()) {
      return core::WindowNode::BoundType::kPreceding;
    } else {
      VELOX_FAIL("The BoundType is not supported.");
    }
  };
  frame.startType = boundTypeConversion(lower_bound);
  switch (frame.startType) {
    case core::WindowNode::BoundType::kPreceding:
      // TODO: support non-literal expression.
      frame.startValue = std::make_shared<core::ConstantTypedExpr>(BIGINT(), variant(lower_bound.preceding().offset()));
      break;
    default:
      frame.startValue = nullptr;
  }
  frame.endType = boundTypeConversion(upper_bound);
  switch (frame.endType) {
    // TODO: support non-literal expression.
    case core::WindowNode::BoundType::kFollowing:
      frame.endValue = std::make_shared<core::ConstantTypedExpr>(BIGINT(), variant(upper_bound.following().offset()));
      break;
    default:
      frame.endValue = nullptr;
  }
  return frame;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WindowRel& windowRel) {
  core::PlanNodePtr childNode;
  if (windowRel.has_input()) {
    childNode = toVeloxPlan(windowRel.input());
  } else {
    VELOX_FAIL("Child Rel is expected in WindowRel.");
  }

  const auto& inputType = childNode->outputType();

  // Parse measures and get the window expressions.
  // Each measure represents one window expression.
  bool ignoreNulls = false;
  std::vector<core::WindowNode::Function> windowNodeFunctions;
  std::vector<std::string> windowColumnNames;

  windowNodeFunctions.reserve(windowRel.measures().size());
  for (const auto& smea : windowRel.measures()) {
    const auto& windowFunction = smea.measure();
    std::string funcName = SubstraitParser::findVeloxFunction(functionMap_, windowFunction.function_reference());
    std::vector<core::TypedExprPtr> windowParams;
    auto& argumentList = windowFunction.arguments();
    windowParams.reserve(argumentList.size());
    // For functions in kOffsetWindowFunctions (see Spark OffsetWindowFunctions),
    // we expect the last arg is passed for setting ignoreNulls.
    if (kOffsetWindowFunctions.find(funcName) != kOffsetWindowFunctions.end()) {
      int i = 0;
      for (; i < argumentList.size() - 1; i++) {
        windowParams.emplace_back(exprConverter_->toVeloxExpr(argumentList[i].value(), inputType));
      }
      auto constantTypedExpr = exprConverter_->toVeloxExpr(argumentList[i].value().literal());
      auto variant = constantTypedExpr->value();
      if (!variant.hasValue()) {
        VELOX_FAIL("Value is expected in variant for setting ignoreNulls.");
      }
      ignoreNulls = variant.value<bool>();
    } else {
      for (const auto& arg : argumentList) {
        windowParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
      }
    }
    auto windowVeloxType = SubstraitParser::parseType(windowFunction.output_type());
    auto windowCall = std::make_shared<const core::CallTypedExpr>(windowVeloxType, std::move(windowParams), funcName);
    auto upperBound = windowFunction.upper_bound();
    auto lowerBound = windowFunction.lower_bound();
    auto type = windowFunction.window_type();

    windowColumnNames.push_back(windowFunction.column_name());

    windowNodeFunctions.push_back(
        {std::move(windowCall), createWindowFrame(lowerBound, upperBound, type), ignoreNulls});
  }

  // Construct partitionKeys
  std::vector<core::FieldAccessTypedExprPtr> partitionKeys;
  std::unordered_set<std::string> keyNames;
  const auto& partitions = windowRel.partition_expressions();
  partitionKeys.reserve(partitions.size());
  for (const auto& partition : partitions) {
    auto expression = exprConverter_->toVeloxExpr(partition, inputType);
    core::FieldAccessTypedExprPtr veloxPartitionKey =
        std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression);
    VELOX_USER_CHECK_NOT_NULL(veloxPartitionKey, "Window Operator only supports field partition key.");
    // Constructs unique parition keys.
    if (keyNames.insert(veloxPartitionKey->name()).second) {
      partitionKeys.emplace_back(veloxPartitionKey);
    }
  }
  std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
  std::vector<core::SortOrder> sortingOrders;
  const auto& [rawSortingKeys, rawSortingOrders] = processSortField(windowRel.sorts(), inputType);
  for (vector_size_t i = 0; i < rawSortingKeys.size(); ++i) {
    // Constructs unique sort keys and excludes keys overlapped with partition keys.
    if (keyNames.insert(rawSortingKeys[i]->name()).second) {
      sortingKeys.emplace_back(rawSortingKeys[i]);
      sortingOrders.emplace_back(rawSortingOrders[i]);
    }
  }

  if (windowRel.has_advanced_extension() &&
      SubstraitParser::configSetInOptimization(windowRel.advanced_extension(), "isStreaming=")) {
    return std::make_shared<core::WindowNode>(
        nextPlanNodeId(),
        partitionKeys,
        sortingKeys,
        sortingOrders,
        windowColumnNames,
        windowNodeFunctions,
        true /*inputsSorted*/,
        childNode);
  } else {
    return std::make_shared<core::WindowNode>(
        nextPlanNodeId(),
        partitionKeys,
        sortingKeys,
        sortingOrders,
        windowColumnNames,
        windowNodeFunctions,
        false /*inputsSorted*/,
        childNode);
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SortRel& sortRel) {
  auto childNode = convertSingleInput<::substrait::SortRel>(sortRel);
  auto [sortingKeys, sortingOrders] = processSortField(sortRel.sorts(), childNode->outputType());
  return std::make_shared<core::OrderByNode>(
      nextPlanNodeId(), sortingKeys, sortingOrders, false /*isPartial*/, childNode);
}

std::pair<std::vector<core::FieldAccessTypedExprPtr>, std::vector<core::SortOrder>>
SubstraitToVeloxPlanConverter::processSortField(
    const ::google::protobuf::RepeatedPtrField<::substrait::SortField>& sortFields,
    const RowTypePtr& inputType) {
  std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
  std::vector<core::SortOrder> sortingOrders;
  sortingKeys.reserve(sortFields.size());
  sortingOrders.reserve(sortFields.size());

  for (const auto& sort : sortFields) {
    sortingOrders.emplace_back(toSortOrder(sort));

    if (sort.has_expr()) {
      auto expression = exprConverter_->toVeloxExpr(sort.expr(), inputType);
      auto fieldExpr = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(expression);
      VELOX_USER_CHECK_NOT_NULL(fieldExpr, "Sort Operator only supports field sorting key");
      sortingKeys.emplace_back(fieldExpr);
    }
  }
  return {sortingKeys, sortingOrders};
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::FilterRel& filterRel) {
  auto childNode = convertSingleInput<::substrait::FilterRel>(filterRel);
  auto filterNode = std::make_shared<core::FilterNode>(
      nextPlanNodeId(), exprConverter_->toVeloxExpr(filterRel.condition(), childNode->outputType()), childNode);

  if (filterRel.has_common()) {
    return processEmit(filterRel.common(), std::move(filterNode));
  } else {
    return filterNode;
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::FetchRel& fetchRel) {
  core::PlanNodePtr childNode;
  // Check the input of fetchRel, if it's sortRel, convert them into
  // topNNode. otherwise, to limitNode.
  ::substrait::SortRel sortRel;
  bool topNFlag;
  if (fetchRel.has_input()) {
    topNFlag = fetchRel.input().has_sort();
    if (topNFlag) {
      sortRel = fetchRel.input().sort();
      childNode = toVeloxPlan(sortRel.input());
    } else {
      childNode = toVeloxPlan(fetchRel.input());
    }
  } else {
    VELOX_FAIL("Child Rel is expected in FetchRel.");
  }

  if (topNFlag) {
    auto [sortingKeys, sortingOrders] = processSortField(sortRel.sorts(), childNode->outputType());

    VELOX_CHECK_EQ(fetchRel.offset(), 0);

    return std::make_shared<core::TopNNode>(
        nextPlanNodeId(), sortingKeys, sortingOrders, (int32_t)fetchRel.count(), false /*isPartial*/, childNode);

  } else {
    return std::make_shared<core::LimitNode>(
        nextPlanNodeId(), (int32_t)fetchRel.offset(), (int32_t)fetchRel.count(), false /*isPartial*/, childNode);
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
    const ::substrait::ReadRel& readRel,
    int32_t streamIdx) {
  // Get the input schema of this iterator.
  uint64_t colNum = 0;
  std::vector<TypePtr> veloxTypeList;
  if (readRel.has_base_schema()) {
    const auto& baseSchema = readRel.base_schema();
    // Input names is not used. Instead, new input/output names will be created
    // because the ValueStreamNode in Velox does not support name change.
    colNum = baseSchema.names().size();
    veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema);
  }

  std::vector<std::string> outNames;
  outNames.reserve(colNum);
  for (int idx = 0; idx < colNum; idx++) {
    auto colName = SubstraitParser::makeNodeName(planNodeId_, idx);
    outNames.emplace_back(colName);
  }

  auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
  auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, outputType);

  auto splitInfo = std::make_shared<SplitInfo>();
  splitInfo->isStream = true;
  splitInfoMap_[node->id()] = splitInfo;
  return node;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) {
  // emit is not allowed in TableScanNode and ValuesNode related
  // outputs
  if (readRel.has_common()) {
    VELOX_USER_CHECK(
        !readRel.common().has_emit(), "Emit not supported for ValuesNode and TableScanNode related Substrait plans.");
  }

  // Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source.
  auto streamIdx = getStreamIndex(readRel);
  if (streamIdx >= 0) {
    return constructValueStreamNode(readRel, streamIdx);
  }

  // Otherwise, will create TableScan node for ReadRel.
  auto splitInfo = std::make_shared<SplitInfo>();
  if (!validationMode_) {
    VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel and related split info.");
    splitInfo = splitInfos_[splitInfoIdx_++];
  }

  // Get output names and types.
  std::vector<std::string> colNameList;
  std::vector<TypePtr> veloxTypeList;
  std::vector<bool> isPartitionColumns;
  // Convert field names into lower case when not case-sensitive.
  std::shared_ptr<const facebook::velox::Config> veloxCfg =
      std::make_shared<const facebook::velox::core::MemConfigMutable>(confMap_);
  bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
  if (readRel.has_base_schema()) {
    const auto& baseSchema = readRel.base_schema();
    colNameList.reserve(baseSchema.names().size());
    for (const auto& name : baseSchema.names()) {
      std::string fieldName = name;
      if (asLowerCase) {
        folly::toLowerAscii(fieldName);
      }
      colNameList.emplace_back(fieldName);
    }
    veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase);
    isPartitionColumns = SubstraitParser::parsePartitionColumns(baseSchema);
  }

  // Do not hard-code connector ID and allow for connectors other than Hive.
  static const std::string kHiveConnectorId = "test-hive";

  // Velox requires Filter Pushdown must being enabled.
  bool filterPushdownEnabled = true;
  std::shared_ptr<connector::hive::HiveTableHandle> tableHandle;
  if (!readRel.has_filter()) {
    tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
        kHiveConnectorId, "hive_table", filterPushdownEnabled, connector::hive::SubfieldFilters{}, nullptr);
  } else {
    // Flatten the conditions connected with 'and'.
    std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
    std::vector<::substrait::Expression_SingularOrList> singularOrLists;
    std::vector<::substrait::Expression_IfThen> ifThens;
    flattenConditions(readRel.filter(), scalarFunctions, singularOrLists, ifThens);

    // The vector's subscript stands for the column index.
    std::vector<RangeRecorder> rangeRecorders(veloxTypeList.size());

    // Separate the filters to be two parts. The subfield part can be
    // pushed down.
    std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions;
    std::vector<::substrait::Expression_ScalarFunction> remainingFunctions;
    std::vector<::substrait::Expression_SingularOrList> subfieldOrLists;
    std::vector<::substrait::Expression_SingularOrList> remainingOrLists;

    separateFilters(
        rangeRecorders,
        scalarFunctions,
        subfieldFunctions,
        remainingFunctions,
        singularOrLists,
        subfieldOrLists,
        remainingOrLists,
        veloxTypeList,
        splitInfo->format);

    // Create subfield filters based on the constructed filter info map.
    auto subfieldFilters = createSubfieldFilters(colNameList, veloxTypeList, subfieldFunctions, subfieldOrLists);
    // Connect the remaining filters with 'and'.
    auto remainingFilter = connectWithAnd(colNameList, veloxTypeList, remainingFunctions, remainingOrLists, ifThens);

    tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
        kHiveConnectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter);
  }

  // Get assignments and out names.
  std::vector<std::string> outNames;
  outNames.reserve(colNameList.size());
  std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>> assignments;
  for (int idx = 0; idx < colNameList.size(); idx++) {
    auto outName = SubstraitParser::makeNodeName(planNodeId_, idx);
    auto columnType = isPartitionColumns[idx] ? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey
                                              : connector::hive::HiveColumnHandle::ColumnType::kRegular;
    assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
        colNameList[idx], columnType, veloxTypeList[idx], veloxTypeList[idx]);
    outNames.emplace_back(outName);
  }
  auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));

  if (readRel.has_virtual_table()) {
    return toVeloxPlan(readRel, outputType);
  } else {
    auto tableScanNode = std::make_shared<core::TableScanNode>(
        nextPlanNodeId(), std::move(outputType), std::move(tableHandle), std::move(assignments));
    // Set split info map.
    splitInfoMap_[tableScanNode->id()] = splitInfo;
    return tableScanNode;
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(
    const ::substrait::ReadRel& readRel,
    const RowTypePtr& type) {
  ::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
  int64_t numVectors = readVirtualTable.values_size();
  int64_t numColumns = type->size();
  int64_t valueFieldNums = readVirtualTable.values(numVectors - 1).fields_size();
  std::vector<RowVectorPtr> vectors;
  vectors.reserve(numVectors);

  int64_t batchSize;
  // For the empty vectors, eg,vectors = makeRowVector(ROW({}, {}), 1).
  if (numColumns == 0) {
    batchSize = 1;
  } else {
    batchSize = valueFieldNums / numColumns;
  }

  for (int64_t index = 0; index < numVectors; ++index) {
    std::vector<VectorPtr> children;
    ::substrait::Expression_Literal_Struct rowValue = readRel.virtual_table().values(index);
    auto fieldSize = rowValue.fields_size();
    VELOX_CHECK_EQ(fieldSize, batchSize * numColumns);

    for (int64_t col = 0; col < numColumns; ++col) {
      const TypePtr& outputChildType = type->childAt(col);
      std::vector<variant> batchChild;
      batchChild.reserve(batchSize);
      for (int64_t batchId = 0; batchId < batchSize; batchId++) {
        // each value in the batch
        auto fieldIdx = col * batchSize + batchId;
        ::substrait::Expression_Literal field = rowValue.fields(fieldIdx);

        auto expr = exprConverter_->toVeloxExpr(field);
        if (auto constantExpr = std::dynamic_pointer_cast<const core::ConstantTypedExpr>(expr)) {
          if (!constantExpr->hasValueVector()) {
            batchChild.emplace_back(constantExpr->value());
          } else {
            VELOX_UNSUPPORTED("Values node with complex type values is not supported yet");
          }
        } else {
          VELOX_FAIL("Expected constant expression");
        }
      }
      children.emplace_back(setVectorFromVariants(outputChildType, batchChild, pool_));
    }

    vectors.emplace_back(std::make_shared<RowVector>(pool_, type, nullptr, batchSize, children));
  }

  return std::make_shared<core::ValuesNode>(nextPlanNodeId(), std::move(vectors));
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::Rel& rel) {
  if (rel.has_aggregate()) {
    return toVeloxPlan(rel.aggregate());
  } else if (rel.has_project()) {
    return toVeloxPlan(rel.project());
  } else if (rel.has_filter()) {
    return toVeloxPlan(rel.filter());
  } else if (rel.has_join()) {
    return toVeloxPlan(rel.join());
  } else if (rel.has_cross()) {
    return toVeloxPlan(rel.cross());
  } else if (rel.has_read()) {
    return toVeloxPlan(rel.read());
  } else if (rel.has_sort()) {
    return toVeloxPlan(rel.sort());
  } else if (rel.has_expand()) {
    return toVeloxPlan(rel.expand());
  } else if (rel.has_generate()) {
    return toVeloxPlan(rel.generate());
  } else if (rel.has_fetch()) {
    return toVeloxPlan(rel.fetch());
  } else if (rel.has_window()) {
    return toVeloxPlan(rel.window());
  } else if (rel.has_write()) {
    return toVeloxPlan(rel.write());
  } else {
    VELOX_NYI("Substrait conversion not supported for Rel.");
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::RelRoot& root) {
  // TODO: Use the names as the output names for the whole computing.
  // const auto& names = root.names();
  if (root.has_input()) {
    const auto& rel = root.input();
    return toVeloxPlan(rel);
  } else {
    VELOX_FAIL("Input is expected in RelRoot.");
  }
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::Plan& substraitPlan) {
  VELOX_CHECK(checkTypeExtension(substraitPlan), "The type extension only have unknown type.")
  // Construct the function map based on the Substrait representation,
  // and initialize the expression converter with it.
  constructFunctionMap(substraitPlan);

  // In fact, only one RelRoot or Rel is expected here.
  VELOX_CHECK_EQ(substraitPlan.relations_size(), 1);
  const auto& rel = substraitPlan.relations(0);
  if (rel.has_root()) {
    return toVeloxPlan(rel.root());
  } else if (rel.has_rel()) {
    return toVeloxPlan(rel.rel());
  } else {
    VELOX_FAIL("RelRoot or Rel is expected in Plan.");
  }
}

std::string SubstraitToVeloxPlanConverter::nextPlanNodeId() {
  auto id = fmt::format("{}", planNodeId_);
  planNodeId_++;
  return id;
}

void SubstraitToVeloxPlanConverter::constructFunctionMap(const ::substrait::Plan& substraitPlan) {
  // Construct the function map based on the Substrait representation.
  for (const auto& extension : substraitPlan.extensions()) {
    if (!extension.has_extension_function()) {
      continue;
    }
    const auto& sFmap = extension.extension_function();
    auto id = sFmap.function_anchor();
    auto name = sFmap.name();
    functionMap_[id] = name;
  }
  exprConverter_ = std::make_unique<SubstraitVeloxExprConverter>(pool_, functionMap_);
}

void SubstraitToVeloxPlanConverter::flattenConditions(
    const ::substrait::Expression& substraitFilter,
    std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
    std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
    std::vector<::substrait::Expression_IfThen>& ifThens) {
  auto typeCase = substraitFilter.rex_type_case();
  switch (typeCase) {
    case ::substrait::Expression::RexTypeCase::kScalarFunction: {
      const auto& sFunc = substraitFilter.scalar_function();
      auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, sFunc.function_reference());
      // TODO: Only and relation is supported here.
      if (SubstraitParser::getNameBeforeDelimiter(filterNameSpec) == "and") {
        for (const auto& sCondition : sFunc.arguments()) {
          flattenConditions(sCondition.value(), scalarFunctions, singularOrLists, ifThens);
        }
      } else {
        scalarFunctions.emplace_back(sFunc);
      }
      break;
    }
    case ::substrait::Expression::RexTypeCase::kSingularOrList: {
      singularOrLists.emplace_back(substraitFilter.singular_or_list());
      break;
    }
    case ::substrait::Expression::RexTypeCase::kIfThen: {
      ifThens.emplace_back(substraitFilter.if_then());
      break;
    }
    default:
      VELOX_NYI("GetFlatConditions not supported for type '{}'", std::to_string(typeCase));
  }
}

std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) {
  return SubstraitParser::findFunctionSpec(functionMap_, id);
}

int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) {
  if (sRead.has_local_files()) {
    const auto& fileList = sRead.local_files().items();
    if (fileList.size() == 0) {
      // bucketed scan may contains empty file list
      return -1;
    }
    // The stream input will be specified with the format of
    // "iterator:${index}".
    std::string filePath = fileList[0].uri_file();
    std::string prefix = "iterator:";
    std::size_t pos = filePath.find(prefix);
    if (pos == std::string::npos) {
      return -1;
    }

    // Get the index.
    std::string idxStr = filePath.substr(pos + prefix.size(), filePath.size());
    try {
      return stoi(idxStr);
    } catch (const std::exception& err) {
      VELOX_FAIL(err.what());
    }
  }
  return -1;
}

void SubstraitToVeloxPlanConverter::extractJoinKeys(
    const ::substrait::Expression& joinExpression,
    std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
    std::vector<const ::substrait::Expression::FieldReference*>& rightExprs) {
  std::vector<const ::substrait::Expression*> expressions;
  expressions.push_back(&joinExpression);
  while (!expressions.empty()) {
    auto visited = expressions.back();
    expressions.pop_back();
    if (visited->rex_type_case() == ::substrait::Expression::RexTypeCase::kScalarFunction) {
      const auto& funcName = SubstraitParser::getNameBeforeDelimiter(
          SubstraitParser::findVeloxFunction(functionMap_, visited->scalar_function().function_reference()));
      const auto& args = visited->scalar_function().arguments();
      if (funcName == "and") {
        expressions.push_back(&args[0].value());
        expressions.push_back(&args[1].value());
      } else if (funcName == "eq" || funcName == "equalto" || funcName == "decimal_equalto") {
        VELOX_CHECK(std::all_of(args.cbegin(), args.cend(), [](const ::substrait::FunctionArgument& arg) {
          return arg.value().has_selection();
        }));
        leftExprs.push_back(&args[0].value().selection());
        rightExprs.push_back(&args[1].value().selection());
      } else {
        VELOX_NYI("Join condition {} not supported.", funcName);
      }
    } else {
      VELOX_FAIL("Unable to parse from join expression: {}", joinExpression.DebugString());
    }
  }
}

connector::hive::SubfieldFilters SubstraitToVeloxPlanConverter::createSubfieldFilters(
    const std::vector<std::string>& inputNameList,
    const std::vector<TypePtr>& inputTypeList,
    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
    const std::vector<::substrait::Expression_SingularOrList>& singularOrLists) {
  // The vector's subscript stands for the column index.
  std::vector<FilterInfo> columnToFilterInfo(inputTypeList.size());

  // Process scalarFunctions.
  for (const auto& scalarFunction : scalarFunctions) {
    auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, scalarFunction.function_reference());
    auto filterName = SubstraitParser::getNameBeforeDelimiter(filterNameSpec);

    if (filterName == sNot) {
      VELOX_CHECK(scalarFunction.arguments().size() == 1);
      auto expr = scalarFunction.arguments()[0].value();
      if (expr.has_scalar_function()) {
        // Set its child to filter info with reverse enabled.
        setFilterInfo(scalarFunction.arguments()[0].value().scalar_function(), inputTypeList, columnToFilterInfo, true);
      } else {
        // TODO: support push down of Not In.
        VELOX_NYI("Scalar function expected.");
      }
    } else if (filterName == sOr) {
      VELOX_CHECK(scalarFunction.arguments().size() == 2);
      VELOX_CHECK(std::all_of(
          scalarFunction.arguments().cbegin(),
          scalarFunction.arguments().cend(),
          [](const ::substrait::FunctionArgument& arg) {
            return arg.value().has_scalar_function() || arg.value().has_singular_or_list();
          }));

      // Set the children functions to filter info. They should be
      // effective to the same field.
      for (const auto& arg : scalarFunction.arguments()) {
        const auto& expr = arg.value();
        if (expr.has_scalar_function()) {
          setFilterInfo(arg.value().scalar_function(), inputTypeList, columnToFilterInfo);
        } else if (expr.has_singular_or_list()) {
          setFilterInfo(expr.singular_or_list(), columnToFilterInfo);
        } else {
          VELOX_NYI("Scalar function or SingularOrList expected.");
        }
      }
    } else {
      setFilterInfo(scalarFunction, inputTypeList, columnToFilterInfo);
    }
  }

  // Process singularOrLists.
  for (const auto& list : singularOrLists) {
    setFilterInfo(list, columnToFilterInfo);
  }

  return mapToFilters(inputNameList, inputTypeList, columnToFilterInfo);
}

bool SubstraitToVeloxPlanConverter::fieldOrWithLiteral(
    const ::google::protobuf::RepeatedPtrField<::substrait::FunctionArgument>& arguments,
    uint32_t& fieldIndex) {
  if (arguments.size() == 1) {
    if (arguments[0].value().has_selection()) {
      // Only field exists.
      fieldIndex = SubstraitParser::parseReferenceSegment(arguments[0].value().selection().direct_reference());
      return true;
    } else {
      return false;
    }
  }

  if (arguments.size() != 2) {
    // Not the field and literal combination.
    return false;
  }
  bool fieldExists = false;
  bool literalExists = false;
  for (const auto& param : arguments) {
    auto typeCase = param.value().rex_type_case();
    switch (typeCase) {
      case ::substrait::Expression::RexTypeCase::kSelection:
        fieldIndex = SubstraitParser::parseReferenceSegment(param.value().selection().direct_reference());
        fieldExists = true;
        break;
      case ::substrait::Expression::RexTypeCase::kLiteral:
        literalExists = true;
        break;
      default:
        break;
    }
  }
  // Whether the field and literal both exist.
  return fieldExists && literalExists;
}

bool SubstraitToVeloxPlanConverter::childrenFunctionsOnSameField(
    const ::substrait::Expression_ScalarFunction& function) {
  // Get the column indices of the children functions.
  std::vector<int32_t> colIndices;
  for (const auto& arg : function.arguments()) {
    if (arg.value().has_scalar_function()) {
      const auto& scalarFunction = arg.value().scalar_function();
      for (const auto& param : scalarFunction.arguments()) {
        if (param.value().has_selection()) {
          const auto& field = param.value().selection();
          VELOX_CHECK(field.has_direct_reference());
          int32_t colIdx = SubstraitParser::parseReferenceSegment(field.direct_reference());
          colIndices.emplace_back(colIdx);
        }
      }
    } else if (arg.value().has_singular_or_list()) {
      const auto& singularOrList = arg.value().singular_or_list();
      int32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
      colIndices.emplace_back(colIdx);
    } else {
      return false;
    }
  }

  if (std::all_of(colIndices.begin(), colIndices.end(), [&](uint32_t idx) { return idx == colIndices[0]; })) {
    // All indices are the same.
    return true;
  }
  return false;
}

bool SubstraitToVeloxPlanConverter::canPushdownFunction(
    const ::substrait::Expression_ScalarFunction& scalarFunction,
    const std::string& filterName,
    uint32_t& fieldIdx) {
  // Condtions can be pushed down.
  static const std::unordered_set<std::string> supportedFunctions = {sIsNotNull, sIsNull, sGte, sGt, sLte, sLt, sEqual};

  bool canPushdown = false;
  if (supportedFunctions.find(filterName) != supportedFunctions.end() &&
      fieldOrWithLiteral(scalarFunction.arguments(), fieldIdx)) {
    // The arg should be field or field with literal.
    canPushdown = true;
  }
  return canPushdown;
}

bool SubstraitToVeloxPlanConverter::canPushdownNot(
    const ::substrait::Expression_ScalarFunction& scalarFunction,
    std::vector<RangeRecorder>& rangeRecorders) {
  VELOX_CHECK(scalarFunction.arguments().size() == 1, "Only one arg is expected for Not.");
  const auto& notArg = scalarFunction.arguments()[0];
  if (!notArg.value().has_scalar_function()) {
    // Not for a Boolean Literal or Or List is not supported curretly.
    // It can be pushed down with an AlwaysTrue or AlwaysFalse Range.
    return false;
  }

  auto argFunction =
      SubstraitParser::findFunctionSpec(functionMap_, notArg.value().scalar_function().function_reference());
  auto functionName = SubstraitParser::getNameBeforeDelimiter(argFunction);

  static const std::unordered_set<std::string> supportedNotFunctions = {sGte, sGt, sLte, sLt, sEqual};

  uint32_t fieldIdx;
  bool isFieldOrWithLiteral = fieldOrWithLiteral(notArg.value().scalar_function().arguments(), fieldIdx);

  if (supportedNotFunctions.find(functionName) != supportedNotFunctions.end() && isFieldOrWithLiteral &&
      rangeRecorders.at(fieldIdx).setCertainRangeForFunction(functionName, true /*reverse*/)) {
    return true;
  }
  return false;
}

bool SubstraitToVeloxPlanConverter::canPushdownOr(
    const ::substrait::Expression_ScalarFunction& scalarFunction,
    std::vector<RangeRecorder>& rangeRecorders) {
  // OR Conditon whose children functions are on different columns is not
  // supported to be pushed down.
  if (!childrenFunctionsOnSameField(scalarFunction)) {
    return false;
  }

  static const std::unordered_set<std::string> supportedOrFunctions = {sIsNotNull, sGte, sGt, sLte, sLt, sEqual};

  for (const auto& arg : scalarFunction.arguments()) {
    if (arg.value().has_scalar_function()) {
      auto nameSpec =
          SubstraitParser::findFunctionSpec(functionMap_, arg.value().scalar_function().function_reference());
      auto functionName = SubstraitParser::getNameBeforeDelimiter(nameSpec);

      uint32_t fieldIdx;
      bool isFieldOrWithLiteral = fieldOrWithLiteral(arg.value().scalar_function().arguments(), fieldIdx);
      if (supportedOrFunctions.find(functionName) == supportedOrFunctions.end() || !isFieldOrWithLiteral ||
          !rangeRecorders.at(fieldIdx).setCertainRangeForFunction(
              functionName, false /*reverse*/, true /*forOrRelation*/)) {
        // The arg should be field or field with literal.
        return false;
      }
    } else if (arg.value().has_singular_or_list()) {
      const auto& singularOrList = arg.value().singular_or_list();
      if (!canPushdownSingularOrList(singularOrList, true)) {
        return false;
      }
      uint32_t fieldIdx = getColumnIndexFromSingularOrList(singularOrList);
      // Disable IN pushdown for int-like types.
      if (!rangeRecorders.at(fieldIdx).setInRange(true /*forOrRelation*/)) {
        return false;
      }
    } else {
      // Or relation betweeen other expressions is not supported to be pushded
      // down currently.
      return false;
    }
  }
  return true;
}

void SubstraitToVeloxPlanConverter::separateFilters(
    std::vector<RangeRecorder>& rangeRecorders,
    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
    std::vector<::substrait::Expression_ScalarFunction>& subfieldFunctions,
    std::vector<::substrait::Expression_ScalarFunction>& remainingFunctions,
    const std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
    std::vector<::substrait::Expression_SingularOrList>& subfieldOrLists,
    std::vector<::substrait::Expression_SingularOrList>& remainingOrLists,
    const std::vector<TypePtr>& veloxTypeList,
    const dwio::common::FileFormat& format) {
  for (const auto& singularOrList : singularOrLists) {
    if (!canPushdownSingularOrList(singularOrList)) {
      remainingOrLists.emplace_back(singularOrList);
      continue;
    }
    uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);
    if (rangeRecorders.at(colIdx).setInRange()) {
      subfieldOrLists.emplace_back(singularOrList);
    } else {
      remainingOrLists.emplace_back(singularOrList);
    }
  }

  for (const auto& scalarFunction : scalarFunctions) {
    auto filterNameSpec = SubstraitParser::findFunctionSpec(functionMap_, scalarFunction.function_reference());
    auto filterName = SubstraitParser::getNameBeforeDelimiter(filterNameSpec);
    // Add all decimal filters to remaining functions because their pushdown are not supported.
    if (format == dwio::common::FileFormat::ORC && scalarFunction.arguments().size() > 0) {
      auto value = scalarFunction.arguments().at(0).value();
      if (value.has_selection()) {
        uint32_t fieldIndex = SubstraitParser::parseReferenceSegment(value.selection().direct_reference());
        if (!veloxTypeList.empty() && veloxTypeList.at(fieldIndex)->isDecimal()) {
          remainingFunctions.emplace_back(scalarFunction);
          continue;
        }
      }
    }

    // Check whether NOT and OR functions can be pushed down.
    // If yes, the scalar function will be added into the subfield functions.
    if (filterName == sNot) {
      if (canPushdownNot(scalarFunction, rangeRecorders)) {
        subfieldFunctions.emplace_back(scalarFunction);
      } else {
        remainingFunctions.emplace_back(scalarFunction);
      }
    } else if (filterName == sOr) {
      if (canPushdownOr(scalarFunction, rangeRecorders)) {
        subfieldFunctions.emplace_back(scalarFunction);
      } else {
        remainingFunctions.emplace_back(scalarFunction);
      }
    } else {
      // Check if the condition is supported to be pushed down.
      uint32_t fieldIdx;
      if (canPushdownFunction(scalarFunction, filterName, fieldIdx) &&
          rangeRecorders.at(fieldIdx).setCertainRangeForFunction(filterName)) {
        subfieldFunctions.emplace_back(scalarFunction);
      } else {
        remainingFunctions.emplace_back(scalarFunction);
      }
    }
  }
}

bool SubstraitToVeloxPlanConverter::RangeRecorder::setCertainRangeForFunction(
    const std::string& functionName,
    bool reverse,
    bool forOrRelation) {
  if (functionName == sLt || functionName == sLte) {
    if (reverse) {
      return setLeftBound(forOrRelation);
    } else {
      return setRightBound(forOrRelation);
    }
  } else if (functionName == sGt || functionName == sGte) {
    if (reverse) {
      return setRightBound(forOrRelation);
    } else {
      return setLeftBound(forOrRelation);
    }
  } else if (functionName == sEqual) {
    if (reverse) {
      // Not equal means lt or gt.
      return setMultiRange();
    } else {
      return setLeftBound(forOrRelation) && setRightBound(forOrRelation);
    }
  } else if (functionName == sOr) {
    if (reverse) {
      // Not supported.
      return false;
    } else {
      return setMultiRange();
    }
  } else if (functionName == sIsNotNull) {
    if (reverse) {
      // Not supported.
      return false;
    } else {
      // Is not null can always coexist with the other range.
      return true;
    }
  } else if (functionName == sIsNull) {
    if (reverse) {
      return setCertainRangeForFunction(sIsNotNull, false, forOrRelation);
    } else {
      return setIsNull();
    }
  } else {
    return false;
  }
}

void SubstraitToVeloxPlanConverter::setColumnFilterInfo(
    const std::string& filterName,
    std::optional<variant> literalVariant,
    FilterInfo& columnFilterInfo,
    bool reverse) {
  if (filterName == sIsNotNull) {
    if (reverse) {
      columnFilterInfo.setNull();
    } else {
      columnFilterInfo.forbidsNull();
    }
  } else if (filterName == sIsNull) {
    if (reverse) {
      columnFilterInfo.forbidsNull();
    } else {
      columnFilterInfo.setNull();
    }
  } else if (filterName == sGte) {
    if (reverse) {
      columnFilterInfo.setUpper(literalVariant, true);
    } else {
      columnFilterInfo.setLower(literalVariant, false);
    }
  } else if (filterName == sGt) {
    if (reverse) {
      columnFilterInfo.setUpper(literalVariant, false);
    } else {
      columnFilterInfo.setLower(literalVariant, true);
    }
  } else if (filterName == sLte) {
    if (reverse) {
      columnFilterInfo.setLower(literalVariant, true);
    } else {
      columnFilterInfo.setUpper(literalVariant, false);
    }
  } else if (filterName == sLt) {
    if (reverse) {
      columnFilterInfo.setLower(literalVariant, false);
    } else {
      columnFilterInfo.setUpper(literalVariant, true);
    }
  } else if (filterName == sEqual) {
    if (reverse) {
      columnFilterInfo.setNotValue(literalVariant);
    } else {
      columnFilterInfo.setLower(literalVariant, false);
      columnFilterInfo.setUpper(literalVariant, false);
    }
  } else {
    VELOX_NYI("setColumnFilterInfo not supported for filter name '{}'", filterName);
  }
}

template <facebook::velox::TypeKind kind>
variant getVariantFromLiteral(const ::substrait::Expression::Literal& literal) {
  using LitT = typename facebook::velox::TypeTraits<kind>::NativeType;
  return variant(SubstraitParser::getLiteralValue<LitT>(literal));
}

void SubstraitToVeloxPlanConverter::setFilterInfo(
    const ::substrait::Expression_ScalarFunction& scalarFunction,
    const std::vector<TypePtr>& inputTypeList,
    std::vector<FilterInfo>& columnToFilterInfo,
    bool reverse) {
  auto nameSpec = SubstraitParser::findFunctionSpec(functionMap_, scalarFunction.function_reference());
  auto functionName = SubstraitParser::getNameBeforeDelimiter(nameSpec);

  // Extract the column index and column bound from the scalar function.
  std::optional<uint32_t> colIdx;
  std::optional<::substrait::Expression_Literal> substraitLit;
  std::vector<std::string> typeCases;

  for (const auto& param : scalarFunction.arguments()) {
    auto typeCase = param.value().rex_type_case();
    switch (typeCase) {
      case ::substrait::Expression::RexTypeCase::kSelection:
        typeCases.emplace_back("kSelection");
        colIdx = SubstraitParser::parseReferenceSegment(param.value().selection().direct_reference());
        break;
      case ::substrait::Expression::RexTypeCase::kLiteral:
        typeCases.emplace_back("kLiteral");
        substraitLit = param.value().literal();
        break;
      default:
        VELOX_NYI("Substrait conversion not supported for arg type '{}'", std::to_string(typeCase));
    }
  }

  static const std::unordered_map<std::string, std::string> functionRevertMap = {
      {sLt, sGt}, {sGt, sLt}, {sGte, sLte}, {sLte, sGte}};

  // Handle the case where literal is before the variable in a binary function, e.g. "123 < q1".
  if (typeCases.size() > 1 && (typeCases[0] == "kLiteral" && typeCases[1] == "kSelection")) {
    auto x = functionRevertMap.find(functionName);
    if (x != functionRevertMap.end()) {
      // Change the function name: lt => gt, gt => lt, gte => lte, lte => gte.
      functionName = x->second;
    }
  }

  if (!colIdx.has_value()) {
    VELOX_NYI("Column index is expected in subfield filters creation.");
  }

  // Set the extracted bound to the specific column.
  uint32_t colIdxVal = colIdx.value();
  std::optional<variant> val;

  auto inputType = inputTypeList[colIdxVal];
  switch (inputType->kind()) {
    case TypeKind::TINYINT:
    case TypeKind::SMALLINT:
    case TypeKind::INTEGER:
    case TypeKind::BIGINT:
    case TypeKind::REAL:
    case TypeKind::DOUBLE:
    case TypeKind::BOOLEAN:
    case TypeKind::VARCHAR:
    case TypeKind::HUGEINT:
      if (substraitLit) {
        auto kind = inputType->kind();
        val = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(getVariantFromLiteral, kind, substraitLit.value());
      }
      break;
    case TypeKind::ARRAY:
    case TypeKind::MAP:
    case TypeKind::ROW:
      // Doing nothing here can let filter IsNotNull still work.
      break;
    default:
      VELOX_NYI("Subfield filters creation not supported for input type '{}' in setFilterInfo", inputType->toString());
  }

  setColumnFilterInfo(functionName, val, columnToFilterInfo[colIdxVal], reverse);
}

template <TypeKind KIND, typename FilterType>
void SubstraitToVeloxPlanConverter::createNotEqualFilter(
    variant notVariant,
    bool nullAllowed,
    std::vector<std::unique_ptr<FilterType>>& colFilters) {
  using NativeType = typename RangeTraits<KIND>::NativeType;
  using RangeType = typename RangeTraits<KIND>::RangeType;
  // Value > lower
  std::unique_ptr<FilterType> lowerFilter;
  if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
    if (notVariant.value<NativeType>() < getMax<NativeType>()) {
      lowerFilter = std::make_unique<common::BigintRange>(
          notVariant.value<NativeType>() + 1 /*lower*/, getMax<NativeType>() /*upper*/, nullAllowed);
    }
  } else {
    lowerFilter = std::make_unique<RangeType>(
        notVariant.value<NativeType>() /*lower*/,
        false /*lowerUnbounded*/,
        true /*lowerExclusive*/,
        getMax<NativeType>() /*upper*/,
        true /*upperUnbounded*/,
        false /*upperExclusive*/,
        nullAllowed);
  }

  // Value < upper
  std::unique_ptr<FilterType> upperFilter;
  if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
    if (getLowest<NativeType>() < notVariant.value<NativeType>()) {
      upperFilter = std::make_unique<common::BigintRange>(
          getLowest<NativeType>() /*lower*/, notVariant.value<NativeType>() - 1 /*upper*/, nullAllowed);
    }
  } else {
    upperFilter = std::make_unique<RangeType>(
        getLowest<NativeType>() /*lower*/,
        true /*lowerUnbounded*/,
        false /*lowerExclusive*/,
        notVariant.value<NativeType>() /*upper*/,
        false /*upperUnbounded*/,
        true /*upperExclusive*/,
        nullAllowed);
  }

  // To avoid overlap of BigintMultiRange, keep this appending order to make sure lower bound of one range is less than
  // the upper bounds of others.
  if (upperFilter) {
    colFilters.emplace_back(std::move(upperFilter));
  }
  if (lowerFilter) {
    colFilters.emplace_back(std::move(lowerFilter));
  }
}

template <TypeKind KIND>
void SubstraitToVeloxPlanConverter::setInFilter(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::BIGINT>(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {
  std::vector<int64_t> values;
  values.reserve(variants.size());
  for (const auto& variant : variants) {
    int64_t value = variant.value<int64_t>();
    values.emplace_back(value);
  }
  filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::INTEGER>(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {
  // Use bigint values for int type.
  std::vector<int64_t> values;
  values.reserve(variants.size());
  for (const auto& variant : variants) {
    // Use the matched type to get value from variant.
    int64_t value = variant.value<int32_t>();
    values.emplace_back(value);
  }
  filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::SMALLINT>(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {
  // Use bigint values for small int type.
  std::vector<int64_t> values;
  values.reserve(variants.size());
  for (const auto& variant : variants) {
    // Use the matched type to get value from variant.
    int64_t value = variant.value<int16_t>();
    values.emplace_back(value);
  }
  filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::TINYINT>(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {
  // Use bigint values for tiny int type.
  std::vector<int64_t> values;
  values.reserve(variants.size());
  for (const auto& variant : variants) {
    // Use the matched type to get value from variant.
    int64_t value = variant.value<int8_t>();
    values.emplace_back(value);
  }
  filters[common::Subfield(inputName)] = common::createBigintValues(values, nullAllowed);
}

template <>
void SubstraitToVeloxPlanConverter::setInFilter<TypeKind::VARCHAR>(
    const std::vector<variant>& variants,
    bool nullAllowed,
    const std::string& inputName,
    connector::hive::SubfieldFilters& filters) {
  std::vector<std::string> values;
  values.reserve(variants.size());
  for (const auto& variant : variants) {
    std::string value = variant.value<std::string>();
    values.emplace_back(value);
  }
  filters[common::Subfield(inputName)] = std::make_unique<common::BytesValues>(values, nullAllowed);
}

template <TypeKind KIND, typename FilterType>
void SubstraitToVeloxPlanConverter::setSubfieldFilter(
    std::vector<std::unique_ptr<FilterType>> colFilters,
    const std::string& inputName,
    bool nullAllowed,
    connector::hive::SubfieldFilters& filters) {
  using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;

  if (colFilters.size() == 1) {
    filters[common::Subfield(inputName)] = std::move(colFilters[0]);
  } else if (colFilters.size() > 1) {
    // BigintMultiRange should have been sorted
    if (colFilters[0]->kind() == common::FilterKind::kBigintRange) {
      std::sort(colFilters.begin(), colFilters.end(), [](const auto& a, const auto& b) {
        return dynamic_cast<common::BigintRange*>(a.get())->lower() <
            dynamic_cast<common::BigintRange*>(b.get())->lower();
      });
    }
    if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
      filters[common::Subfield(inputName)] =
          std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
    } else {
      filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
    }
  }
}

template <TypeKind KIND, typename FilterType>
void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
    uint32_t colIdx,
    const std::string& inputName,
    const TypePtr& inputType,
    const FilterInfo& filterInfo,
    connector::hive::SubfieldFilters& filters) {
  if (!filterInfo.isInitialized()) {
    return;
  }

  bool nullAllowed = filterInfo.nullAllowed_;
  bool isNull = filterInfo.isNull_;
  uint32_t rangeSize = std::max(filterInfo.lowerBounds_.size(), filterInfo.upperBounds_.size());

  if constexpr (KIND == facebook::velox::TypeKind::HUGEINT) {
    // TODO: open it when the Velox's modification is ready.
    VELOX_NYI("constructSubfieldFilters not support for HUGEINT type");
  } else if constexpr (KIND == facebook::velox::TypeKind::BOOLEAN) {
    // Handle bool type filters.
    // Not equal.
    if (filterInfo.notValue_) {
      filters[common::Subfield(inputName)] =
          std::move(std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(), nullAllowed));
    } else if (rangeSize == 0) {
      // IsNull/IsNotNull.
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
      }
      return;
    } else {
      // Equal.
      auto value = filterInfo.lowerBounds_[0].value().value<bool>();
      VELOX_CHECK(value == filterInfo.upperBounds_[0].value().value<bool>(), "invalid state of bool equal");
      filters[common::Subfield(inputName)] = std::move(std::make_unique<common::BoolValue>(value, nullAllowed));
    }
  } else if constexpr (KIND == facebook::velox::TypeKind::ARRAY || KIND == facebook::velox::TypeKind::MAP) {
    // Only IsNotNull and IsNull are supported for array and map types.
    if (rangeSize == 0) {
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI(
            "Only IsNotNull and IsNull are supported in constructSubfieldFilters for input type '{}'.",
            inputType->toString());
      }
    }
  } else {
    using NativeType = typename RangeTraits<KIND>::NativeType;
    using RangeType = typename RangeTraits<KIND>::RangeType;
    using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;

    // Handle 'in' filter.
    if (filterInfo.values_.size() > 0) {
      // To filter out null is a default behaviour of Spark IN expression.
      nullAllowed = false;
      setInFilter<KIND>(filterInfo.values_, nullAllowed, inputName, filters);
      // Currently, In cannot coexist with other filter conditions
      // due to multirange is in 'OR' relation but 'AND' is needed.
      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after IN filter.");
      VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be supported after IN filter.");
      return;
    }

    // Construct the Filters.
    std::vector<std::unique_ptr<FilterType>> colFilters;

    // Handle not(equal) filter.
    if (filterInfo.notValue_) {
      variant notVariant = filterInfo.notValue_.value();
      createNotEqualFilter<KIND, FilterType>(notVariant, filterInfo.nullAllowed_, colFilters);
      // Currently, Not-equal cannot coexist with other filter conditions
      // due to multirange is in 'OR' relation but 'AND' is needed.
      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter.");
      if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
        if (colFilters.size() == 1) {
          filters[common::Subfield(inputName)] = std::move(colFilters.front());
        } else {
          filters[common::Subfield(inputName)] =
              std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
        }
      } else {
        if (colFilters.size() == 1) {
          filters[common::Subfield(inputName)] = std::move(colFilters.front());
        } else {
          filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
        }
      }
      return;
    }

    // Handle null filtering.
    if (rangeSize == 0) {
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
      }
      return;
    }

    NativeType lowerBound;
    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
      if (inputType->isShortDecimal()) {
        lowerBound = DecimalUtil::kShortDecimalMin;
      } else {
        lowerBound = getLowest<NativeType>();
      }
    } else {
      lowerBound = getLowest<NativeType>();
    }

    NativeType upperBound;
    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
      if (inputType->isShortDecimal()) {
        upperBound = DecimalUtil::kShortDecimalMax;
      } else {
        upperBound = getMax<NativeType>();
      }
    } else {
      upperBound = getMax<NativeType>();
    }

    bool lowerUnbounded = true;
    bool upperUnbounded = true;
    bool lowerExclusive = false;
    bool upperExclusive = false;

    // Handle other filter ranges.
    for (uint32_t idx = 0; idx < rangeSize; idx++) {
      if (idx < filterInfo.lowerBounds_.size() && filterInfo.lowerBounds_[idx]) {
        lowerUnbounded = false;
        variant lowerVariant = filterInfo.lowerBounds_[idx].value();
        lowerBound = lowerVariant.value<NativeType>();
        lowerExclusive = filterInfo.lowerExclusives_[idx];
      }

      if (idx < filterInfo.upperBounds_.size() && filterInfo.upperBounds_[idx]) {
        upperUnbounded = false;
        variant upperVariant = filterInfo.upperBounds_[idx].value();
        upperBound = upperVariant.value<NativeType>();
        upperExclusive = filterInfo.upperExclusives_[idx];
      }

      std::unique_ptr<FilterType> filter;
      if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
        filter = std::move(std::make_unique<common::BigintRange>(
            lowerExclusive ? lowerBound + 1 : lowerBound, upperExclusive ? upperBound - 1 : upperBound, nullAllowed));
      } else {
        filter = std::move(std::make_unique<RangeType>(
            lowerBound, lowerUnbounded, lowerExclusive, upperBound, upperUnbounded, upperExclusive, nullAllowed));
      }

      colFilters.emplace_back(std::move(filter));
    }

    // Set the SubfieldFilter.
    setSubfieldFilter<KIND, FilterType>(std::move(colFilters), inputName, filterInfo.nullAllowed_, filters);
  }
}

bool SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan& substraitPlan) {
  for (const auto& sExtension : substraitPlan.extensions()) {
    if (!sExtension.has_extension_type()) {
      continue;
    }

    // Only support UNKNOWN type in UserDefined type extension.
    if (sExtension.extension_type().name() != "UNKNOWN") {
      return false;
    }
  }
  return true;
}

connector::hive::SubfieldFilters SubstraitToVeloxPlanConverter::mapToFilters(
    const std::vector<std::string>& inputNameList,
    const std::vector<TypePtr>& inputTypeList,
    std::vector<FilterInfo>& columnToFilterInfo) {
  // Construct the subfield filters based on the filter info map.
  connector::hive::SubfieldFilters filters;
  for (uint32_t colIdx = 0; colIdx < inputNameList.size(); colIdx++) {
    if (columnToFilterInfo[colIdx].isInitialized()) {
      auto inputType = inputTypeList[colIdx];
      if (inputType->isDate()) {
        constructSubfieldFilters<TypeKind::INTEGER, common::BigintRange>(
            colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
        continue;
      }
      switch (inputType->kind()) {
        case TypeKind::TINYINT:
          constructSubfieldFilters<TypeKind::TINYINT, common::BigintRange>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::SMALLINT:
          constructSubfieldFilters<TypeKind::SMALLINT, common::BigintRange>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::INTEGER:
          constructSubfieldFilters<TypeKind::INTEGER, common::BigintRange>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::BIGINT:
          constructSubfieldFilters<TypeKind::BIGINT, common::BigintRange>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::REAL:
          constructSubfieldFilters<TypeKind::REAL, common::Filter>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::DOUBLE:
          constructSubfieldFilters<TypeKind::DOUBLE, common::Filter>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::BOOLEAN:
          constructSubfieldFilters<TypeKind::BOOLEAN, common::BoolValue>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::VARCHAR:
          constructSubfieldFilters<TypeKind::VARCHAR, common::Filter>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::HUGEINT:
          constructSubfieldFilters<TypeKind::HUGEINT, common::HugeintRange>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::ARRAY:
          constructSubfieldFilters<TypeKind::ARRAY, common::Filter>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        case TypeKind::MAP:
          constructSubfieldFilters<TypeKind::MAP, common::Filter>(
              colIdx, inputNameList[colIdx], inputType, columnToFilterInfo[colIdx], filters);
          break;
        default:
          VELOX_NYI(
              "Subfield filters creation not supported for input type '{}' in mapToFilters", inputType->toString());
      }
    }
  }

  return filters;
}

core::TypedExprPtr SubstraitToVeloxPlanConverter::connectWithAnd(
    std::vector<std::string> inputNameList,
    std::vector<TypePtr> inputTypeList,
    const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
    const std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
    const std::vector<::substrait::Expression_IfThen>& ifThens) {
  if (scalarFunctions.size() == 0 && singularOrLists.size() == 0 && ifThens.size() == 0) {
    return nullptr;
  }
  auto inputType = ROW(std::move(inputNameList), std::move(inputTypeList));

  // Filter for scalar functions.
  std::vector<core::TypedExprPtr> allFilters;
  for (auto scalar : scalarFunctions) {
    auto filter = exprConverter_->toVeloxExpr(scalar, inputType);
    if (filter != nullptr) {
      allFilters.emplace_back(filter);
    }
  }

  for (auto orList : singularOrLists) {
    auto filter = exprConverter_->toVeloxExpr(orList, inputType);
    if (filter != nullptr) {
      allFilters.emplace_back(filter);
    }
  }

  for (auto ifThen : ifThens) {
    auto filter = exprConverter_->toVeloxExpr(ifThen, inputType);
    if (filter != nullptr) {
      allFilters.emplace_back(filter);
    }
  }
  VELOX_CHECK_GT(allFilters.size(), 0, "One filter should be valid.")
  core::TypedExprPtr andFilter = allFilters[0];
  for (auto i = 1; i < allFilters.size(); i++) {
    andFilter = connectWithAnd(andFilter, allFilters[i]);
  }

  return andFilter;
}

core::TypedExprPtr SubstraitToVeloxPlanConverter::connectWithAnd(
    core::TypedExprPtr leftExpr,
    core::TypedExprPtr rightExpr) {
  std::vector<core::TypedExprPtr> params;
  params.reserve(2);
  params.emplace_back(leftExpr);
  params.emplace_back(rightExpr);
  return std::make_shared<const core::CallTypedExpr>(BOOLEAN(), std::move(params), "and");
}

bool SubstraitToVeloxPlanConverter::canPushdownSingularOrList(
    const ::substrait::Expression_SingularOrList& singularOrList,
    bool disableIntLike) {
  VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is expected.");
  // Check whether the value is field.
  bool hasField = singularOrList.value().has_selection();
  const auto& options = singularOrList.options();
  for (const auto& option : options) {
    VELOX_CHECK(option.has_literal(), "Literal is expected as option.");
    auto type = option.literal().literal_type_case();
    // Only BigintValues and BytesValues are supported.
    if (type != ::substrait::Expression_Literal::LiteralTypeCase::kI32 &&
        type != ::substrait::Expression_Literal::LiteralTypeCase::kI64 &&
        type != ::substrait::Expression_Literal::LiteralTypeCase::kString) {
      return false;
    }

    // BigintMultiRange can only accept BigintRange, so disableIntLike is set to
    // true for OR pushdown of int-like types.
    if (disableIntLike &&
        (type == ::substrait::Expression_Literal::LiteralTypeCase::kI32 ||
         type == ::substrait::Expression_Literal::LiteralTypeCase::kI64)) {
      return false;
    }
  }
  return hasField;
}

uint32_t SubstraitToVeloxPlanConverter::getColumnIndexFromSingularOrList(
    const ::substrait::Expression_SingularOrList& singularOrList) {
  // Get the column index.
  ::substrait::Expression_FieldReference selection;
  if (singularOrList.value().has_scalar_function()) {
    selection = singularOrList.value().scalar_function().arguments()[0].value().selection();
  } else if (singularOrList.value().has_selection()) {
    selection = singularOrList.value().selection();
  } else {
    VELOX_FAIL("Unsupported type in IN pushdown.");
  }
  return SubstraitParser::parseReferenceSegment(selection.direct_reference());
}

void SubstraitToVeloxPlanConverter::setFilterInfo(
    const ::substrait::Expression_SingularOrList& singularOrList,
    std::vector<FilterInfo>& columnToFilterInfo) {
  VELOX_CHECK(singularOrList.options_size() > 0, "At least one option is expected.");
  // Get the column index.
  uint32_t colIdx = getColumnIndexFromSingularOrList(singularOrList);

  // Get the value list.
  const auto& options = singularOrList.options();
  std::vector<variant> variants;
  variants.reserve(options.size());
  for (const auto& option : options) {
    VELOX_CHECK(option.has_literal(), "Literal is expected as option.");
    variants.emplace_back(exprConverter_->toVeloxExpr(option.literal())->value());
  }
  // Set the value list to filter info.
  columnToFilterInfo[colIdx].setValues(variants);
}

} // namespace gluten
