private boolean canSpecializeMapJoin()

in ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java [3582:4000]


  private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc,
      boolean isTez, VectorizationContext vContext, VectorMapJoinDesc vectorDesc)
          throws HiveException {

    Preconditions.checkState(op instanceof MapJoinOperator);

    VectorMapJoinInfo vectorMapJoinInfo = new VectorMapJoinInfo();

    boolean isVectorizationMapJoinNativeEnabled = HiveConf.getBoolVar(hiveConf,
        HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED);

    String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);

    boolean oneMapJoinCondition = (desc.getConds().length == 1);

    boolean hasNullSafes = onExpressionHasNullSafes(desc);

    byte posBigTable = (byte) desc.getPosBigTable();

    // Since we want to display all the met and not met conditions in EXPLAIN, we determine all
    // information first....

    List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable);

    boolean outerJoinHasNoKeys = (!desc.isNoOuterJoin() && keyDesc.size() == 0);

    // For now, we don't support joins on or using DECIMAL_64.
    VectorExpression[] allBigTableKeyExpressions =
        vContext.getVectorExpressionsUpConvertDecimal64(keyDesc);
    final int allBigTableKeyExpressionsLength = allBigTableKeyExpressions.length;
    boolean supportsKeyTypes = true;  // Assume.
    HashSet<String> notSupportedKeyTypes = new HashSet<String>();

    // Since a key expression can be a calculation and the key will go into a scratch column,
    // we need the mapping and type information.
    int[] bigTableKeyColumnMap = new int[allBigTableKeyExpressionsLength];
    String[] bigTableKeyColumnNames = new String[allBigTableKeyExpressionsLength];
    TypeInfo[] bigTableKeyTypeInfos = new TypeInfo[allBigTableKeyExpressionsLength];
    ArrayList<VectorExpression> bigTableKeyExpressionsList = new ArrayList<VectorExpression>();
    VectorExpression[] slimmedBigTableKeyExpressions;
    for (int i = 0; i < allBigTableKeyExpressionsLength; i++) {
      VectorExpression ve = allBigTableKeyExpressions[i];
      if (!IdentityExpression.isColumnOnly(ve)) {
        bigTableKeyExpressionsList.add(ve);
      }
      bigTableKeyColumnMap[i] = ve.getOutputColumnNum();

      ExprNodeDesc exprNode = keyDesc.get(i);
      bigTableKeyColumnNames[i] = exprNode.toString();

      TypeInfo typeInfo = exprNode.getTypeInfo();
      // Verify we handle the key column types for an optimized table.  This is the effectively the
      // same check used in HashTableLoader.
      if (!MapJoinKey.isSupportedField(typeInfo)) {
        supportsKeyTypes = false;
        Category category = typeInfo.getCategory();
        notSupportedKeyTypes.add(
            (category != Category.PRIMITIVE ? category.toString() :
              ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory().toString()));
      }
      bigTableKeyTypeInfos[i] = typeInfo;
    }
    if (bigTableKeyExpressionsList.size() == 0) {
      slimmedBigTableKeyExpressions = null;
    } else {
      slimmedBigTableKeyExpressions = bigTableKeyExpressionsList.toArray(new VectorExpression[0]);
    }

    List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable);

    // For now, we don't support joins on or using DECIMAL_64.
    VectorExpression[] allBigTableValueExpressions =
        vContext.getVectorExpressions(bigTableExprs);

    boolean isFastHashTableEnabled =
        HiveConf.getBoolVar(hiveConf,
            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED);

    // Especially since LLAP is prone to turn it off in the MapJoinDesc in later
    // physical optimizer stages...
    boolean isHybridHashJoin = desc.isHybridHashJoin();

    /*
     * Populate vectorMapJoininfo.
     */

    /*
     * Similarly, we need a mapping since a value expression can be a calculation and the value
     * will go into a scratch column.
     *
     * Value expressions include keys? YES.
     */
    boolean supportsValueTypes = true;  // Assume.
    HashSet<String> notSupportedValueTypes = new HashSet<String>();

    int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length];
    String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length];
    TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length];
    ArrayList<VectorExpression> bigTableValueExpressionsList = new ArrayList<VectorExpression>();
    VectorExpression[] slimmedBigTableValueExpressions;
    for (int i = 0; i < bigTableValueColumnMap.length; i++) {
      VectorExpression ve = allBigTableValueExpressions[i];
      if (!IdentityExpression.isColumnOnly(ve)) {
        bigTableValueExpressionsList.add(ve);
      }
      bigTableValueColumnMap[i] = ve.getOutputColumnNum();

      ExprNodeDesc exprNode = bigTableExprs.get(i);
      bigTableValueColumnNames[i] = exprNode.toString();
      TypeInfo typeInfo = exprNode.getTypeInfo();
      if (!(typeInfo instanceof PrimitiveTypeInfo)) {
        supportsValueTypes = false;
        Category category = typeInfo.getCategory();
        notSupportedValueTypes.add(category.toString());
      }
      bigTableValueTypeInfos[i] = typeInfo;
    }
    if (bigTableValueExpressionsList.size() == 0) {
      slimmedBigTableValueExpressions = null;
    } else {
      slimmedBigTableValueExpressions =
          bigTableValueExpressionsList.toArray(new VectorExpression[0]);
    }

    vectorMapJoinInfo.setBigTableKeyColumnMap(bigTableKeyColumnMap);
    vectorMapJoinInfo.setBigTableKeyColumnNames(bigTableKeyColumnNames);
    vectorMapJoinInfo.setBigTableKeyTypeInfos(bigTableKeyTypeInfos);
    vectorMapJoinInfo.setSlimmedBigTableKeyExpressions(slimmedBigTableKeyExpressions);

    vectorDesc.setAllBigTableKeyExpressions(allBigTableKeyExpressions);

    vectorMapJoinInfo.setBigTableValueColumnMap(bigTableValueColumnMap);
    vectorMapJoinInfo.setBigTableValueColumnNames(bigTableValueColumnNames);
    vectorMapJoinInfo.setBigTableValueTypeInfos(bigTableValueTypeInfos);
    vectorMapJoinInfo.setSlimmedBigTableValueExpressions(slimmedBigTableValueExpressions);

    vectorDesc.setAllBigTableValueExpressions(allBigTableValueExpressions);

    /*
     * Column mapping.
     */
    VectorColumnOutputMapping bigTableRetainMapping =
        new VectorColumnOutputMapping("Big Table Retain Mapping");

    VectorColumnOutputMapping nonOuterSmallTableKeyMapping =
        new VectorColumnOutputMapping("Non Outer Small Table Key Key Mapping");

    VectorColumnOutputMapping outerSmallTableKeyMapping =
        new VectorColumnOutputMapping("Outer Small Table Key Mapping");

    VectorColumnSourceMapping fullOuterSmallTableKeyMapping =
        new VectorColumnSourceMapping("Full Outer Small Table Key Mapping");

    // The order of the fields in the LazyBinary small table value must be used, so
    // we use the source ordering flavor for the mapping.
    VectorColumnSourceMapping smallTableValueMapping =
        new VectorColumnSourceMapping("Small Table Value Mapping");

    Byte[] order = desc.getTagOrder();
    Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
    boolean isOuterJoin = !desc.getNoOuterJoin();

    /*
     * Gather up big and small table output result information from the MapJoinDesc.
     */
    List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable);

    int[] smallTableIndices;
    int smallTableIndicesSize;
    List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
    if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
      smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
      smallTableIndicesSize = smallTableIndices.length;
    } else {
      smallTableIndices = null;
      smallTableIndicesSize = 0;
    }

    List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
    int smallTableRetainSize = smallTableRetainList.size();

    int smallTableResultSize = 0;
    if (smallTableIndicesSize > 0) {
      smallTableResultSize = smallTableIndicesSize;
    } else if (smallTableRetainSize > 0) {
      smallTableResultSize = smallTableRetainSize;
    }

    /*
     * Determine the big table retained mapping first so we can optimize out (with
     * projection) copying inner join big table keys in the subsequent small table results section.
     */

    // We use a mapping object here so we can build the projection in any order and
    // get the ordered by 0 to n-1 output columns at the end.
    //
    // Also, to avoid copying a big table key into the small table result area for inner joins,
    // we reference it with the projection so there can be duplicate output columns
    // in the projection.
    VectorColumnSourceMapping projectionMapping = new VectorColumnSourceMapping("Projection Mapping");

    int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize);

    final int bigTableRetainSize = bigTableRetainList.size();
    for (int i = 0; i < bigTableRetainSize; i++) {

      // Since bigTableValueExpressions may do a calculation and produce a scratch column, we
      // need to map to the right batch column.

      int retainColumn = bigTableRetainList.get(i);
      int batchColumnIndex = bigTableValueColumnMap[retainColumn];
      TypeInfo typeInfo = bigTableValueTypeInfos[i];

      // With this map we project the big table batch to make it look like an output batch.
      projectionMapping.add(nextOutputColumn, batchColumnIndex, typeInfo);

      // Collect columns we copy from the big table batch to the overflow batch.
      if (!bigTableRetainMapping.containsOutputColumn(batchColumnIndex)) {

        // Tolerate repeated use of a big table column.
        bigTableRetainMapping.add(batchColumnIndex, batchColumnIndex, typeInfo);
      }

      nextOutputColumn++;
    }

    /*
     * Now determine the small table results.
     */
    boolean smallTableExprVectorizes = true;

    int firstSmallTableOutputColumn;
    firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0);
    nextOutputColumn = firstSmallTableOutputColumn;

    // Small table indices has more information (i.e. keys) than retain, so use it if it exists...
    if (smallTableIndicesSize > 0) {

      for (int i = 0; i < smallTableIndicesSize; i++) {
        if (smallTableIndices[i] >= 0) {
          // Zero and above numbers indicate a big table key is needed for
          // small table result "area".

          int keyIndex = smallTableIndices[i];

          // Since bigTableKeyExpressions may do a calculation and produce a scratch column, we
          // need to map the right column.
          int bigTableKeyColumn = bigTableKeyColumnMap[keyIndex];
          TypeInfo typeInfo = bigTableKeyTypeInfos[keyIndex];

          if (!isOuterJoin) {

            // Optimize inner join keys of small table results.

            // Project the big table key into the small table result "area".
            projectionMapping.add(nextOutputColumn, bigTableKeyColumn, typeInfo);

            if (!bigTableRetainMapping.containsOutputColumn(bigTableKeyColumn)) {

              // When the Big Key is not retained in the output result, we do need to copy the
              // Big Table key into the overflow batch so the projection of it (Big Table key) to
              // the Small Table key will work properly...
              //
              nonOuterSmallTableKeyMapping.add(bigTableKeyColumn, bigTableKeyColumn, typeInfo);
            }
          } else {

            // For outer joins, since the small table key can be null when there for NOMATCH,
            // we must have a physical (scratch) column for those keys.  We cannot use the
            // projection optimization used by non-[FULL} OUTER joins above.

            int scratchColumn = vContext.allocateScratchColumn(typeInfo);
            projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);

            outerSmallTableKeyMapping.add(bigTableKeyColumn, scratchColumn, typeInfo);

            // For FULL OUTER MapJoin, we need to be able to deserialize a Small Table key
            // into the output result.
            fullOuterSmallTableKeyMapping.add(keyIndex, scratchColumn, typeInfo);
          }
        } else {

          // Negative numbers indicate a column to be (deserialize) read from the small table's
          // LazyBinary value row.
          int smallTableValueIndex = -smallTableIndices[i] - 1;

          ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
          if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
            clearNotVectorizedReason();
            smallTableExprVectorizes = false;
          }

          TypeInfo typeInfo = smallTableExprNode.getTypeInfo();

          // Make a new big table scratch column for the small table value.
          int scratchColumn = vContext.allocateScratchColumn(typeInfo);
          projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);

          smallTableValueMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
        }
        nextOutputColumn++;
      }
    } else if (smallTableRetainSize > 0) {
      // Only small table values appear in join output result.

      for (int i = 0; i < smallTableRetainSize; i++) {
        int smallTableValueIndex = smallTableRetainList.get(i);

        ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
        if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
          clearNotVectorizedReason();
          smallTableExprVectorizes = false;
        }

        // Make a new big table scratch column for the small table value.
        TypeInfo typeInfo = smallTableExprNode.getTypeInfo();
        int scratchColumn = vContext.allocateScratchColumn(typeInfo);

        projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);

        smallTableValueMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
        nextOutputColumn++;
      }
    }

    Map<Byte, List<ExprNodeDesc>> filterExpressions = desc.getFilters();
    VectorExpression[] bigTableFilterExpressions =
        vContext.getVectorExpressions(
            filterExpressions.get(posBigTable),
            VectorExpressionDescriptor.Mode.FILTER);
    vectorMapJoinInfo.setBigTableFilterExpressions(bigTableFilterExpressions);

    boolean useOptimizedTable =
        HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAPJOIN_USE_OPTIMIZED_TABLE);

    // Remember the condition variables for EXPLAIN regardless of whether we specialize or not.
    vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo);

    vectorDesc.setUseOptimizedTable(useOptimizedTable);
    vectorDesc.setIsVectorizationMapJoinNativeEnabled(isVectorizationMapJoinNativeEnabled);
    vectorDesc.setEngine(engine);
    vectorDesc.setOneMapJoinCondition(oneMapJoinCondition);
    vectorDesc.setHasNullSafes(hasNullSafes);
    vectorDesc.setSmallTableExprVectorizes(smallTableExprVectorizes);
    vectorDesc.setOuterJoinHasNoKeys(outerJoinHasNoKeys);

    vectorDesc.setIsFastHashTableEnabled(isFastHashTableEnabled);
    vectorDesc.setIsHybridHashJoin(isHybridHashJoin);

    vectorDesc.setSupportsKeyTypes(supportsKeyTypes);
    if (!supportsKeyTypes) {
      vectorDesc.setNotSupportedKeyTypes(new ArrayList<>(notSupportedKeyTypes));
    }
    vectorDesc.setSupportsValueTypes(supportsValueTypes);
    if (!supportsValueTypes) {
      vectorDesc.setNotSupportedValueTypes(new ArrayList<>(notSupportedValueTypes));
    }

    // Check common conditions for both Optimized and Fast Hash Tables.
    boolean result = true;    // Assume.
    if (!useOptimizedTable ||
        !isVectorizationMapJoinNativeEnabled ||
        !isTez ||
        !oneMapJoinCondition ||
        hasNullSafes ||
        !smallTableExprVectorizes ||
        outerJoinHasNoKeys ||
        !supportsValueTypes) {
      result = false;
    }

    // supportsKeyTypes

    if (!isFastHashTableEnabled) {

      // Check optimized-only hash table restrictions.
      if (!supportsKeyTypes) {
        result = false;
      }

    } else {

      // With the fast hash table implementation, we currently do not support
      // Hybrid Grace Hash Join.

      if (isHybridHashJoin) {
        result = false;
      }
    }

    // Convert dynamic arrays and maps to simple arrays.

    bigTableRetainMapping.finalize();
    vectorMapJoinInfo.setBigTableRetainColumnMap(bigTableRetainMapping.getOutputColumns());
    vectorMapJoinInfo.setBigTableRetainTypeInfos(bigTableRetainMapping.getTypeInfos());

    nonOuterSmallTableKeyMapping.finalize();
    vectorMapJoinInfo.setNonOuterSmallTableKeyColumnMap(nonOuterSmallTableKeyMapping.getOutputColumns());
    vectorMapJoinInfo.setNonOuterSmallTableKeyTypeInfos(nonOuterSmallTableKeyMapping.getTypeInfos());

    outerSmallTableKeyMapping.finalize();
    fullOuterSmallTableKeyMapping.finalize();

    vectorMapJoinInfo.setOuterSmallTableKeyMapping(outerSmallTableKeyMapping);
    vectorMapJoinInfo.setFullOuterSmallTableKeyMapping(fullOuterSmallTableKeyMapping);

    smallTableValueMapping.finalize();

    vectorMapJoinInfo.setSmallTableValueMapping(smallTableValueMapping);

    projectionMapping.finalize();

    // Verify we added an entry for each output.
    assert projectionMapping.isSourceSequenceGood();

    vectorMapJoinInfo.setProjectionMapping(projectionMapping);

    return result;
  }