public boolean sharedWorkOptimization()

in ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java [363:633]


  public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCache optimizerCache,
      List<TableScanOperator> tableScans, Mode mode, boolean schemaMerge) throws SemanticException {
    // Boolean to keep track of whether this method actually merged any TS operators
    boolean mergedExecuted = false;

    Set<TableScanOperator> retainedScans = new LinkedHashSet<>();
    Set<Operator<?>> removedOps = new HashSet<>();
    for (TableScanOperator discardableTsOp : tableScans) {
      TableName tableName1 = discardableTsOp.getTableName();
        if (discardableTsOp.getNumChild() == 0) {
          removedOps.add(discardableTsOp);
        }
      if (removedOps.contains(discardableTsOp)) {
        LOG.debug("Skip {} as it has already been removed", discardableTsOp);
        continue;
      }
      for (TableScanOperator retainableTsOp : retainedScans) {
        if (optimizerCache.getWorkGroup(discardableTsOp).contains(retainableTsOp)) {
          LOG.trace("No need check further {} and {} are in the same group", discardableTsOp, retainableTsOp);
          continue;
        }
        if (removedOps.contains(retainableTsOp)) {
          LOG.debug("Skip {} as it has already been removed", retainableTsOp);
          continue;
        }
        LOG.debug("Can we merge {} into {} to remove a scan on {}?", discardableTsOp, retainableTsOp, tableName1);

        SharedResult sr;

        // If Iceberg metadata tables are in the query, disable this optimisation.
        String metaTable1 = retainableTsOp.getConf().getTableMetadata().getMetaTable();
        String metaTable2 = discardableTsOp.getConf().getTableMetadata().getMetaTable();
        if (metaTable1 != null || metaTable2 != null) {
          LOG.info("Skip the schema merging as the query contains Iceberg metadata table.");
          continue;
        }

        if (!schemaMerge && !compatibleSchema(retainableTsOp, discardableTsOp)) {
          LOG.debug("incompatible schemas: {} {} for {} (and merge disabled)", discardableTsOp, retainableTsOp,
              tableName1);
          continue;
        }

        if (mode == Mode.RemoveSemijoin) {
          // We check if the two table scan operators can actually be merged modulo SJs.
          // Hence, two conditions should be met:
          // (i) the TS ops should be mergeable excluding any kind of DPP, and
          // (ii) the DPP branches (excluding SJs) should be the same
          boolean mergeable = areMergeable(pctx, retainableTsOp, discardableTsOp);
          if (!mergeable) {
            // Skip
            LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
            continue;
          }
          boolean validMerge =
              areMergeableExcludeSemijoinsExtendedCheck(pctx, optimizerCache, retainableTsOp, discardableTsOp);
          if (!validMerge) {
            // Skip
            LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
            continue;
          }

          // If tests pass, we create the shared work optimizer additional information
          // about the part of the tree that can be merged. We need to regenerate the
          // cache because semijoin operators have been removed
          sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, true, true);
        } else if (mode == Mode.DPPUnion) {
          boolean mergeable = areMergeable(pctx, retainableTsOp, discardableTsOp);
          if (!mergeable) {
            LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
            continue;
          }
          boolean validMerge = areMergeableDppUnion(pctx, optimizerCache, retainableTsOp, discardableTsOp);
          if (!validMerge) {
            // Skip
            LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
            continue;
          }

          // If tests pass, we create the shared work optimizer additional information
          // about the part of the tree that can be merged. We need to regenerate the
          // cache because semijoin operators have been removed
          sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, false,
              false);
          if (!validPreConditions(pctx, optimizerCache, sr)) {
            continue;
          }
        } else if (mode == Mode.SubtreeMerge) {
          // First we quickly check if the two table scan operators can actually be merged
          if (!areMergeable(pctx, retainableTsOp, discardableTsOp)
              || !areMergeableExtendedCheck(pctx, optimizerCache, retainableTsOp, discardableTsOp)) {
            // Skip
            LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
            continue;
          }

          // Secondly, we extract information about the part of the tree that can be merged
          // as well as some structural information (memory consumption) that needs to be
          // used to determined whether the merge can happen
          sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, true, true);

          // It seems these two operators can be merged.
          // Check that plan meets some preconditions before doing it.
          // In particular, in the presence of map joins in the upstream plan:
          // - we cannot exceed the noconditional task size, and
          // - if we already merged the big table, we cannot merge the broadcast
          // tables.
          if (!validPreConditions(pctx, optimizerCache, sr)) {
            // Skip
            LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
            continue;
          }
        } else {
          throw new RuntimeException("unhandled mode: " + mode);
        }

        // We can merge
        mergedExecuted = true;
        if (mode != Mode.DPPUnion && sr.retainableOps.size() > 1) {
          // More than TS operator
          Operator<?> lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1);
          Operator<?> lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1);
          if (lastDiscardableOp.getNumChild() != 0) {
            List<Operator<? extends OperatorDesc>> allChildren =
                Lists.newArrayList(lastDiscardableOp.getChildOperators());
            for (Operator<? extends OperatorDesc> op : allChildren) {
              lastDiscardableOp.getChildOperators().remove(op);
              op.replaceParent(lastDiscardableOp, lastRetainableOp);
              lastRetainableOp.getChildOperators().add(op);
            }
          }

          LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp);
        } else {

          if (sr.discardableOps.size() > 1) {
            throw new RuntimeException("we can't discard more in this path");
          }

          DecomposedTs modelR = new DecomposedTs(retainableTsOp);
          DecomposedTs modelD = new DecomposedTs(discardableTsOp);

          // Push filter on top of children for retainable
          pushFilterToTopOfTableScan(optimizerCache, modelR);

          if (mode == Mode.RemoveSemijoin || mode == Mode.SubtreeMerge) {
            // For RemoveSemiJoin; this will clear the discardable's semijoin filters
            replaceSemijoinExpressions(discardableTsOp, modelR.getSemiJoinFilter());
          }

          modelD.replaceTabAlias(discardableTsOp.getConf().getAlias(), retainableTsOp.getConf().getAlias());

          // Push filter on top of children for discardable
          pushFilterToTopOfTableScan(optimizerCache, modelD);


          // Obtain filter for shared TS operator
          ExprNodeDesc exprNode = null;
          if (modelR.normalFilterExpr != null && modelD.normalFilterExpr != null) {
            exprNode = disjunction(modelR.normalFilterExpr, modelD.normalFilterExpr);
          }
          List<ExprNodeDesc> semiJoinExpr = null;
          if (mode == Mode.DPPUnion) {
            assert modelR.semijoinExprNodes != null;
            assert modelD.semijoinExprNodes != null;
            ExprNodeDesc disjunction =
                disjunction(conjunction(modelR.semijoinExprNodes), conjunction(modelD.semijoinExprNodes));
            semiJoinExpr = disjunction == null ? null : Lists.newArrayList(disjunction);
          } else {
            semiJoinExpr = modelR.semijoinExprNodes;
          }

          // Create expression node that will be used for the retainable table scan
          exprNode = conjunction(semiJoinExpr, exprNode);
          // Replace filter
          retainableTsOp.getConf().setFilterExpr((ExprNodeGenericFuncDesc) exprNode);
          // Replace table scan operator
          adoptChildren(retainableTsOp, discardableTsOp);

          LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp);
        }

        // First we remove the input operators of the expression that
        // we are going to eliminate
        if (mode != Mode.DPPUnion) {
          for (Operator<?> op : sr.discardableInputOps) {
            OperatorUtils.removeOperator(op);
            optimizerCache.removeOp(op);
            removedOps.add(op);
            // Remove DPP predicates
            if (op instanceof ReduceSinkOperator) {
              SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
              if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp())
                  && !sr.discardableInputOps.contains(sjbi.getTsOp())) {
                GenTezUtils.removeSemiJoinOperator(pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
                optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
              }
            } else if (op instanceof AppMasterEventOperator) {
              DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
              if (!sr.discardableOps.contains(dped.getTableScan())
                  && !sr.discardableInputOps.contains(dped.getTableScan())) {
                GenTezUtils.removeSemiJoinOperator(pctx, (AppMasterEventOperator) op, dped.getTableScan());
                optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
              }
            }
            LOG.debug("Input operator removed: {}", op);
          }
        }
        // A shared TSop across branches can not have probeContext that utilizes single branch info
        // Filtered-out rows from one branch might be needed by another branch sharing a TSop
        if (retainableTsOp.getProbeDecodeContext() != null) {
          LOG.debug("Removing probeDecodeCntx for merged TS op {}", retainableTsOp);
          retainableTsOp.setProbeDecodeContext(null);
          retainableTsOp.getConf().setProbeDecodeContext(null);
        }

        // Then we merge the operators of the works we are going to merge
        mergeSchema(discardableTsOp, retainableTsOp);

        if (mode == Mode.DPPUnion) {
          // reparent all
          Collection<Operator<?>> discardableDPP = optimizerCache.tableScanToDPPSource.get(discardableTsOp);
          for (Operator<?> op : discardableDPP) {
            if (op instanceof ReduceSinkOperator) {
              SemiJoinBranchInfo sjInfo = pctx.getRsToSemiJoinBranchInfo().get(op);
              sjInfo.setTableScan(retainableTsOp);
            } else if (op.getConf() instanceof DynamicPruningEventDesc) {
              DynamicPruningEventDesc dynamicPruningEventDesc = (DynamicPruningEventDesc) op.getConf();
              dynamicPruningEventDesc.setTableScan(retainableTsOp);
            }
          }
          optimizerCache.tableScanToDPPSource.get(retainableTsOp).addAll(discardableDPP);
          discardableDPP.clear();
        }
        optimizerCache.removeOpAndCombineWork(discardableTsOp, retainableTsOp);

        removedOps.add(discardableTsOp);
        // Finally we remove the expression from the tree
        for (Operator<?> op : sr.discardableOps) {
          OperatorUtils.removeOperator(op);
          optimizerCache.removeOp(op);
          removedOps.add(op);
          LOG.debug("Operator removed: {}", op);
        }

        if (pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_DOWNSTREAM_MERGE)) {
          if (sr.discardableOps.size() == 1) {
            downStreamMerge(retainableTsOp, optimizerCache, pctx);
          }
        }

        break;
      }

      if (removedOps.contains(discardableTsOp)) {
        // This operator has been removed, remove it from the list of existing operators
        // FIXME: there is no point of this
        retainedScans.remove(discardableTsOp);
      } else {
        // This operator has not been removed, include it in the list of existing operators
        retainedScans.add(discardableTsOp);
      }
    }

    // Remove unused table scan operators
    pctx.getTopOps().entrySet().removeIf((Entry<String, TableScanOperator> e) -> e.getValue().getNumChild() == 0);

    tableScans.removeAll(removedOps);

    return mergedExecuted;
  }