private boolean validatePTFOperator()

in ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java [2828:3031]


  private boolean validatePTFOperator(PTFOperator op, VectorizationContext vContext,
      VectorPTFDesc vectorPTFDesc)
      throws HiveException {

    if (!isPtfVectorizationEnabled) {
      setNodeIssue("Vectorization of PTF is not enabled (" +
          HiveConf.ConfVars.HIVE_VECTORIZATION_PTF_ENABLED.varname + " IS false)");
      return false;
    }
    PTFDesc ptfDesc = op.getConf();
    boolean isMapSide = ptfDesc.isMapSide();
    if (isMapSide) {
      setOperatorIssue("PTF Mapper not supported");
      return false;
    }
    List<Operator<? extends OperatorDesc>> ptfParents = op.getParentOperators();
    if (ptfParents != null && ptfParents.size() > 0) {
      Operator<? extends OperatorDesc> ptfParent = op.getParentOperators().get(0);
      if (!(ptfParent instanceof ReduceSinkOperator)) {
        boolean isReduceShufflePtf = false;
        if (ptfParent instanceof SelectOperator) {
          ptfParents = ptfParent.getParentOperators();
          if (ptfParents == null || ptfParents.size() == 0) {
            isReduceShufflePtf = true;
          } else {
            ptfParent = ptfParent.getParentOperators().get(0);
            isReduceShufflePtf = (ptfParent instanceof ReduceSinkOperator);
          }
        }
        if (!isReduceShufflePtf) {
          setOperatorIssue("Only PTF directly under reduce-shuffle is supported");
          return false;
        }
      }
    }
    boolean forNoop = ptfDesc.forNoop();
    if (forNoop) {
      setOperatorIssue("NOOP not supported");
      return false;
    }
    boolean forWindowing = ptfDesc.forWindowing();
    if (!forWindowing) {
      setOperatorIssue("Windowing required");
      return false;
    }
    PartitionedTableFunctionDef funcDef = ptfDesc.getFuncDef();
    boolean isWindowTableFunctionDef = (funcDef instanceof WindowTableFunctionDef);
    if (!isWindowTableFunctionDef) {
      setOperatorIssue("Must be a WindowTableFunctionDef");
      return false;
    }

    // We collect information in VectorPTFDesc that doesn't need the VectorizationContext.
    // We use this information for validation.  Later when creating the vector operator
    // we create an additional object VectorPTFInfo.

    try {
      createVectorPTFDesc(
          op, ptfDesc, vContext, vectorPTFDesc, vectorizedPTFMaxMemoryBufferingBatchCount);
    } catch (HiveException e) {
      setOperatorIssue("exception: " + VectorizationContext.getStackTraceAsSingleLine(e));
      return false;
    }

    // Output columns ok?
    String[] outputColumnNames = vectorPTFDesc.getOutputColumnNames();
    TypeInfo[] outputTypeInfos = vectorPTFDesc.getOutputTypeInfos();
    final int outputCount = outputColumnNames.length;
    for (int i = 0; i < outputCount; i++) {
      String typeName = outputTypeInfos[i].getTypeName();
      boolean ret = validateDataType(typeName, VectorExpressionDescriptor.Mode.PROJECTION, /* allowComplex */ false);
      if (!ret) {
        setExpressionIssue("PTF Output Columns", "Data type " + typeName + " of column " + outputColumnNames[i] + " not supported");
        return false;
      }
    }

    boolean[] distinctEvaluator = vectorPTFDesc.getEvaluatorsAreDistinct();
    String[] evaluatorFunctionNames = vectorPTFDesc.getEvaluatorFunctionNames();
    final int count = evaluatorFunctionNames.length;
    WindowFrameDef[] evaluatorWindowFrameDefs = vectorPTFDesc.getEvaluatorWindowFrameDefs();
    List<ExprNodeDesc>[] evaluatorInputExprNodeDescLists = vectorPTFDesc.getEvaluatorInputExprNodeDescLists();

    for (int i = 0; i < count; i++) {
      String functionName = evaluatorFunctionNames[i].toLowerCase();
      SupportedFunctionType supportedFunctionType = VectorPTFDesc.supportedFunctionsMap.get(functionName);
      if (supportedFunctionType == null) {
        setOperatorIssue(functionName + " not in supported functions " + VectorPTFDesc.supportedFunctionNames);
        return false;
      }

      if (distinctEvaluator[i] && !supportedFunctionType.isSupportDistinct()) {
        setOperatorIssue(functionName + " distinct is not supported ");
        return false;
      }

      WindowFrameDef windowFrameDef = evaluatorWindowFrameDefs[i];

      List<ExprNodeDesc> exprNodeDescList = evaluatorInputExprNodeDescLists[i];
      final boolean isSingleParameter =
          (exprNodeDescList != null &&
          exprNodeDescList.size() == 1);
      final ExprNodeDesc singleExprNodeDesc =
          (isSingleParameter ? exprNodeDescList.get(0) : null);
      final TypeInfo singleTypeInfo =
          (isSingleParameter ? singleExprNodeDesc.getTypeInfo() : null);
      final PrimitiveCategory singlePrimitiveCategory =
          (singleTypeInfo instanceof PrimitiveTypeInfo ?
              ((PrimitiveTypeInfo) singleTypeInfo).getPrimitiveCategory() : null);

      switch (windowFrameDef.getWindowType()) {
      case RANGE:
        if (!windowFrameDef.getEnd().isCurrentRow()) {
          setOperatorIssue(functionName + " only CURRENT ROW end frame is supported for RANGE");
          return false;
        }
        break;
      case ROWS:
        {
          boolean isRowEndCurrent =
              (windowFrameDef.getEnd().isCurrentRow() &&
              (supportedFunctionType == SupportedFunctionType.AVG ||
               supportedFunctionType == SupportedFunctionType.MAX ||
               supportedFunctionType == SupportedFunctionType.MIN ||
               supportedFunctionType == SupportedFunctionType.SUM) &&
              isSingleParameter &&
              singlePrimitiveCategory != null);
          if (!isRowEndCurrent && !windowFrameDef.isEndUnbounded()) {
            setOperatorIssue(
                functionName + " UNBOUNDED end frame is required for ROWS window type");
            return false;
          }
        }
        break;
      default:
        throw new RuntimeException("Unexpected window type " + windowFrameDef.getWindowType());
      }

      // RANK/DENSE_RANK don't care about columns.
      if (supportedFunctionType != SupportedFunctionType.RANK &&
          supportedFunctionType != SupportedFunctionType.DENSE_RANK) {

        if (exprNodeDescList != null) {
          // LEAD and LAG now supports multiple arguments in vectorized mode
          if (exprNodeDescList.size() > 1 && supportedFunctionType != SupportedFunctionType.LAG
              && supportedFunctionType != SupportedFunctionType.LEAD) {
            setOperatorIssue(
                "More than 1 argument expression of aggregation function " + functionName);
            return false;
          }

          ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0);

          if (containsLeadLag(exprNodeDesc)) {
            setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName);
            return false;
          }

          if (supportedFunctionType != SupportedFunctionType.COUNT) {

            // COUNT does not care about column types.  The rest do.
            TypeInfo typeInfo = exprNodeDesc.getTypeInfo();
            Category category = typeInfo.getCategory();
            boolean isSupportedType;
            if (category != Category.PRIMITIVE) {
              isSupportedType = false;
            } else {
              ColumnVector.Type colVecType =
                  VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
              switch (colVecType) {
              case LONG:
              case DOUBLE:
              case DECIMAL:
                isSupportedType = true;
                break;
              default:
                isSupportedType = false;
                break;
              }
            }
            if (!isSupportedType) {
              setOperatorIssue(typeInfo.getTypeName() + " data type not supported in argument expression of aggregation function " + functionName);
              return false;
            }
          }
        }
      }
      if (vectorPTFDesc.getOrderExprNodeDescs().length > 1) {
        /*
         * Currently, we need to rule out here all cases where a range boundary scanner can run,
         * basically: 1. bounded start 2. bounded end which is not current row
         */
        if (windowFrameDef.getWindowType() == WindowType.RANGE
            && (!windowFrameDef.isStartUnbounded()
                || !(windowFrameDef.getEnd().isCurrentRow() || windowFrameDef.isEndUnbounded()))) {
          setOperatorIssue(
              "Multi-column ordered RANGE boundary scanner is not supported in vectorized mode (window: "
                  + windowFrameDef + ")");
          return false;
        }
      }
    }
    return true;
  }