in ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java [363:633]
public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCache optimizerCache,
List<TableScanOperator> tableScans, Mode mode, boolean schemaMerge) throws SemanticException {
// Boolean to keep track of whether this method actually merged any TS operators
boolean mergedExecuted = false;
Set<TableScanOperator> retainedScans = new LinkedHashSet<>();
Set<Operator<?>> removedOps = new HashSet<>();
for (TableScanOperator discardableTsOp : tableScans) {
TableName tableName1 = discardableTsOp.getTableName();
if (discardableTsOp.getNumChild() == 0) {
removedOps.add(discardableTsOp);
}
if (removedOps.contains(discardableTsOp)) {
LOG.debug("Skip {} as it has already been removed", discardableTsOp);
continue;
}
for (TableScanOperator retainableTsOp : retainedScans) {
if (optimizerCache.getWorkGroup(discardableTsOp).contains(retainableTsOp)) {
LOG.trace("No need check further {} and {} are in the same group", discardableTsOp, retainableTsOp);
continue;
}
if (removedOps.contains(retainableTsOp)) {
LOG.debug("Skip {} as it has already been removed", retainableTsOp);
continue;
}
LOG.debug("Can we merge {} into {} to remove a scan on {}?", discardableTsOp, retainableTsOp, tableName1);
SharedResult sr;
// If Iceberg metadata tables are in the query, disable this optimisation.
String metaTable1 = retainableTsOp.getConf().getTableMetadata().getMetaTable();
String metaTable2 = discardableTsOp.getConf().getTableMetadata().getMetaTable();
if (metaTable1 != null || metaTable2 != null) {
LOG.info("Skip the schema merging as the query contains Iceberg metadata table.");
continue;
}
if (!schemaMerge && !compatibleSchema(retainableTsOp, discardableTsOp)) {
LOG.debug("incompatible schemas: {} {} for {} (and merge disabled)", discardableTsOp, retainableTsOp,
tableName1);
continue;
}
if (mode == Mode.RemoveSemijoin) {
// We check if the two table scan operators can actually be merged modulo SJs.
// Hence, two conditions should be met:
// (i) the TS ops should be mergeable excluding any kind of DPP, and
// (ii) the DPP branches (excluding SJs) should be the same
boolean mergeable = areMergeable(pctx, retainableTsOp, discardableTsOp);
if (!mergeable) {
// Skip
LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
continue;
}
boolean validMerge =
areMergeableExcludeSemijoinsExtendedCheck(pctx, optimizerCache, retainableTsOp, discardableTsOp);
if (!validMerge) {
// Skip
LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
continue;
}
// If tests pass, we create the shared work optimizer additional information
// about the part of the tree that can be merged. We need to regenerate the
// cache because semijoin operators have been removed
sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, true, true);
} else if (mode == Mode.DPPUnion) {
boolean mergeable = areMergeable(pctx, retainableTsOp, discardableTsOp);
if (!mergeable) {
LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
continue;
}
boolean validMerge = areMergeableDppUnion(pctx, optimizerCache, retainableTsOp, discardableTsOp);
if (!validMerge) {
// Skip
LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
continue;
}
// If tests pass, we create the shared work optimizer additional information
// about the part of the tree that can be merged. We need to regenerate the
// cache because semijoin operators have been removed
sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, false,
false);
if (!validPreConditions(pctx, optimizerCache, sr)) {
continue;
}
} else if (mode == Mode.SubtreeMerge) {
// First we quickly check if the two table scan operators can actually be merged
if (!areMergeable(pctx, retainableTsOp, discardableTsOp)
|| !areMergeableExtendedCheck(pctx, optimizerCache, retainableTsOp, discardableTsOp)) {
// Skip
LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp);
continue;
}
// Secondly, we extract information about the part of the tree that can be merged
// as well as some structural information (memory consumption) that needs to be
// used to determined whether the merge can happen
sr = extractSharedOptimizationInfoForRoot(pctx, optimizerCache, retainableTsOp, discardableTsOp, true, true);
// It seems these two operators can be merged.
// Check that plan meets some preconditions before doing it.
// In particular, in the presence of map joins in the upstream plan:
// - we cannot exceed the noconditional task size, and
// - if we already merged the big table, we cannot merge the broadcast
// tables.
if (!validPreConditions(pctx, optimizerCache, sr)) {
// Skip
LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp);
continue;
}
} else {
throw new RuntimeException("unhandled mode: " + mode);
}
// We can merge
mergedExecuted = true;
if (mode != Mode.DPPUnion && sr.retainableOps.size() > 1) {
// More than TS operator
Operator<?> lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1);
Operator<?> lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1);
if (lastDiscardableOp.getNumChild() != 0) {
List<Operator<? extends OperatorDesc>> allChildren =
Lists.newArrayList(lastDiscardableOp.getChildOperators());
for (Operator<? extends OperatorDesc> op : allChildren) {
lastDiscardableOp.getChildOperators().remove(op);
op.replaceParent(lastDiscardableOp, lastRetainableOp);
lastRetainableOp.getChildOperators().add(op);
}
}
LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp);
} else {
if (sr.discardableOps.size() > 1) {
throw new RuntimeException("we can't discard more in this path");
}
DecomposedTs modelR = new DecomposedTs(retainableTsOp);
DecomposedTs modelD = new DecomposedTs(discardableTsOp);
// Push filter on top of children for retainable
pushFilterToTopOfTableScan(optimizerCache, modelR);
if (mode == Mode.RemoveSemijoin || mode == Mode.SubtreeMerge) {
// For RemoveSemiJoin; this will clear the discardable's semijoin filters
replaceSemijoinExpressions(discardableTsOp, modelR.getSemiJoinFilter());
}
modelD.replaceTabAlias(discardableTsOp.getConf().getAlias(), retainableTsOp.getConf().getAlias());
// Push filter on top of children for discardable
pushFilterToTopOfTableScan(optimizerCache, modelD);
// Obtain filter for shared TS operator
ExprNodeDesc exprNode = null;
if (modelR.normalFilterExpr != null && modelD.normalFilterExpr != null) {
exprNode = disjunction(modelR.normalFilterExpr, modelD.normalFilterExpr);
}
List<ExprNodeDesc> semiJoinExpr = null;
if (mode == Mode.DPPUnion) {
assert modelR.semijoinExprNodes != null;
assert modelD.semijoinExprNodes != null;
ExprNodeDesc disjunction =
disjunction(conjunction(modelR.semijoinExprNodes), conjunction(modelD.semijoinExprNodes));
semiJoinExpr = disjunction == null ? null : Lists.newArrayList(disjunction);
} else {
semiJoinExpr = modelR.semijoinExprNodes;
}
// Create expression node that will be used for the retainable table scan
exprNode = conjunction(semiJoinExpr, exprNode);
// Replace filter
retainableTsOp.getConf().setFilterExpr((ExprNodeGenericFuncDesc) exprNode);
// Replace table scan operator
adoptChildren(retainableTsOp, discardableTsOp);
LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp);
}
// First we remove the input operators of the expression that
// we are going to eliminate
if (mode != Mode.DPPUnion) {
for (Operator<?> op : sr.discardableInputOps) {
OperatorUtils.removeOperator(op);
optimizerCache.removeOp(op);
removedOps.add(op);
// Remove DPP predicates
if (op instanceof ReduceSinkOperator) {
SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp())
&& !sr.discardableInputOps.contains(sjbi.getTsOp())) {
GenTezUtils.removeSemiJoinOperator(pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
}
} else if (op instanceof AppMasterEventOperator) {
DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
if (!sr.discardableOps.contains(dped.getTableScan())
&& !sr.discardableInputOps.contains(dped.getTableScan())) {
GenTezUtils.removeSemiJoinOperator(pctx, (AppMasterEventOperator) op, dped.getTableScan());
optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
}
}
LOG.debug("Input operator removed: {}", op);
}
}
// A shared TSop across branches can not have probeContext that utilizes single branch info
// Filtered-out rows from one branch might be needed by another branch sharing a TSop
if (retainableTsOp.getProbeDecodeContext() != null) {
LOG.debug("Removing probeDecodeCntx for merged TS op {}", retainableTsOp);
retainableTsOp.setProbeDecodeContext(null);
retainableTsOp.getConf().setProbeDecodeContext(null);
}
// Then we merge the operators of the works we are going to merge
mergeSchema(discardableTsOp, retainableTsOp);
if (mode == Mode.DPPUnion) {
// reparent all
Collection<Operator<?>> discardableDPP = optimizerCache.tableScanToDPPSource.get(discardableTsOp);
for (Operator<?> op : discardableDPP) {
if (op instanceof ReduceSinkOperator) {
SemiJoinBranchInfo sjInfo = pctx.getRsToSemiJoinBranchInfo().get(op);
sjInfo.setTableScan(retainableTsOp);
} else if (op.getConf() instanceof DynamicPruningEventDesc) {
DynamicPruningEventDesc dynamicPruningEventDesc = (DynamicPruningEventDesc) op.getConf();
dynamicPruningEventDesc.setTableScan(retainableTsOp);
}
}
optimizerCache.tableScanToDPPSource.get(retainableTsOp).addAll(discardableDPP);
discardableDPP.clear();
}
optimizerCache.removeOpAndCombineWork(discardableTsOp, retainableTsOp);
removedOps.add(discardableTsOp);
// Finally we remove the expression from the tree
for (Operator<?> op : sr.discardableOps) {
OperatorUtils.removeOperator(op);
optimizerCache.removeOp(op);
removedOps.add(op);
LOG.debug("Operator removed: {}", op);
}
if (pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_DOWNSTREAM_MERGE)) {
if (sr.discardableOps.size() == 1) {
downStreamMerge(retainableTsOp, optimizerCache, pctx);
}
}
break;
}
if (removedOps.contains(discardableTsOp)) {
// This operator has been removed, remove it from the list of existing operators
// FIXME: there is no point of this
retainedScans.remove(discardableTsOp);
} else {
// This operator has not been removed, include it in the list of existing operators
retainedScans.add(discardableTsOp);
}
}
// Remove unused table scan operators
pctx.getTopOps().entrySet().removeIf((Entry<String, TableScanOperator> e) -> e.getValue().getNumChild() == 0);
tableScans.removeAll(removedOps);
return mergedExecuted;
}