in ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java [382:659]
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
// We should not use this optimization if sorted dynamic partition optimizer is used,
// as RS will be required.
if (pGraphContext.isReduceSinkAddedBySortedDynPartition()) {
LOG.info("Reduce Sink is added by Sorted Dynamic Partition Optimizer. Bailing out of" +
" Bucketing Sorting Reduce Sink Optimizer");
return null;
}
// If the reduce sink has not been introduced due to bucketing/sorting, ignore it
FileSinkOperator fsOp = (FileSinkOperator) nd;
ReduceSinkOperator rsOp = (ReduceSinkOperator) fsOp.getParentOperators().get(0).getParentOperators().get(0);
List<ReduceSinkOperator> rsOps = pGraphContext
.getReduceSinkOperatorsAddedByEnforceBucketingSorting();
// nothing to do
if ((rsOps != null) && (!rsOps.contains(rsOp))) {
return null;
}
// Don't do this optimization with updates or deletes
if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
return null;
}
if (stack.get(0) instanceof TableScanOperator) {
TableScanOperator tso = ((TableScanOperator)stack.get(0));
Table tab = tso.getConf().getTableMetadata();
if (AcidUtils.isFullAcidTable(tab)) {
/*ACID tables have complex directory layout and require merging of delta files
* on read thus we should not try to read bucket files directly*/
return null;
} else if (AcidUtils.isInsertOnlyTable(tab.getParameters())) {
// Do not support MM tables either at this point. We could do it with some extra logic.
return null;
}
}
// Support for dynamic partitions can be added later
if (fsOp.getConf().getDynPartCtx() != null) {
return null;
}
// No conversion is possible for the reduce keys
for (ExprNodeDesc keyCol : rsOp.getConf().getKeyCols()) {
if (!(keyCol instanceof ExprNodeColumnDesc)) {
return null;
}
}
Table destTable = fsOp.getConf().getTable();
if (destTable == null) {
return null;
}
int numBucketsDestination = destTable.getNumBuckets();
// Get the positions for sorted and bucketed columns
// For sorted columns, also get the order (ascending/descending) - that should
// also match for this to be converted to a map-only job.
// Get the positions for sorted and bucketed columns
// For sorted columns, also get the order (ascending/descending) - that should
// also match for this to be converted to a map-only job.
List<Integer> bucketPositions =
getBucketPositions(destTable.getBucketCols(), destTable.getCols());
List<Integer> sortPositions =
getSortPositions(destTable.getSortCols(), destTable.getCols());
List<Integer> sortOrder =
getSortOrder(destTable.getSortCols(), destTable.getCols());
boolean useBucketSortPositions = true;
// Only selects and filters are allowed
Operator<? extends OperatorDesc> op = rsOp;
// TableScan will also be followed by a Select Operator. Find the expressions for the
// bucketed/sorted columns for the destination table
List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
op = op.getParentOperators().get(0);
boolean isSrcMmTable = false;
while (true) {
if (!(op instanceof TableScanOperator) &&
!(op instanceof FilterOperator) &&
!(op instanceof SelectOperator) &&
!(op instanceof SMBMapJoinOperator)) {
return null;
}
if (op instanceof SMBMapJoinOperator) {
// Bucketing and sorting keys should exactly match
if (!(bucketPositions.equals(sortPositions))) {
return null;
}
SMBMapJoinOperator smbOp = (SMBMapJoinOperator) op;
SMBJoinDesc smbJoinDesc = smbOp.getConf();
int posBigTable = smbJoinDesc.getPosBigTable();
// join keys don't match the bucketing keys
List<ExprNodeDesc> keysBigTable = smbJoinDesc.getKeys().get((byte) posBigTable);
if (keysBigTable.size() != bucketPositions.size()) {
return null;
}
if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols,
sourceTableSortCols, sortOrder)) {
return null;
}
sourceTableBucketCols.clear();
sourceTableSortCols.clear();
useBucketSortPositions = false;
for (ExprNodeDesc keyBigTable : keysBigTable) {
if (!(keyBigTable instanceof ExprNodeColumnDesc)) {
return null;
}
sourceTableBucketCols.add((ExprNodeColumnDesc) keyBigTable);
sourceTableSortCols.add((ExprNodeColumnDesc) keyBigTable);
}
// since it is a sort-merge join, only follow the big table
op = op.getParentOperators().get(posBigTable);
} else {
// nothing to be done for filters - the output schema does not change.
if (op instanceof TableScanOperator) {
assert !useBucketSortPositions;
TableScanOperator ts = (TableScanOperator) op;
Table srcTable = ts.getConf().getTableMetadata();
// Not supported for MM tables for now.
if (AcidUtils.isInsertOnlyTable(destTable.getParameters())) {
return null;
}
// Find the positions of the bucketed columns in the table corresponding
// to the select list.
// Consider the following scenario:
// T1(key, value1, value2) bucketed/sorted by key into 2 buckets
// T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
// A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
// should be optimized.
// Start with the destination: T2, bucketed/sorted position is [1]
// At the source T1, the column corresponding to that position is [key], which
// maps to column [0] of T1, which is also bucketed/sorted into the same
// number of buckets
List<Integer> newBucketPositions = new ArrayList<Integer>();
for (int pos = 0; pos < bucketPositions.size(); pos++) {
ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
String colName = col.getColumn();
int bucketPos = findColumnPosition(srcTable.getCols(), colName);
if (bucketPos < 0) {
return null;
}
newBucketPositions.add(bucketPos);
}
// Find the positions/order of the sorted columns in the table corresponding
// to the select list.
List<Integer> newSortPositions = new ArrayList<Integer>();
for (int pos = 0; pos < sortPositions.size(); pos++) {
ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
String colName = col.getColumn();
int sortPos = findColumnPosition(srcTable.getCols(), colName);
if (sortPos < 0) {
return null;
}
newSortPositions.add(sortPos);
}
if (srcTable.isPartitioned()) {
PrunedPartitionList prunedParts =
pGraphContext.getPrunedPartitions(srcTable.getTableName(), ts);
List<Partition> partitions = prunedParts.getNotDeniedPartns();
// Support for dynamic partitions can be added later
// The following is not optimized:
// insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
// where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
return null;
}
for (Partition partition : partitions) {
if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder,
numBucketsDestination)) {
return null;
}
}
removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
partitions.get(0).getSortedPaths());
return null;
}
else {
if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder,
numBucketsDestination)) {
return null;
}
removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
return null;
}
}
// None of the operators is changing the positions
else if (op instanceof SelectOperator) {
SelectOperator selectOp = (SelectOperator) op;
SelectDesc selectDesc = selectOp.getConf();
// Iterate backwards, from the destination table to the top of the tree
// Based on the output column names, get the new columns.
if (!useBucketSortPositions) {
bucketPositions.clear();
sortPositions.clear();
List<String> outputColumnNames = selectDesc.getOutputColumnNames();
for (ExprNodeColumnDesc col : sourceTableBucketCols) {
String colName = col.getColumn();
int colPos = outputColumnNames.indexOf(colName);
if (colPos < 0) {
return null;
}
bucketPositions.add(colPos);
}
for (ExprNodeColumnDesc col : sourceTableSortCols) {
String colName = col.getColumn();
int colPos = outputColumnNames.indexOf(colName);
if (colPos < 0) {
return null;
}
sortPositions.add(colPos);
}
}
// There may be multiple selects - chose the one closest to the table
sourceTableBucketCols.clear();
sourceTableSortCols.clear();
if (selectDesc.getColList().size() < bucketPositions.size()
|| selectDesc.getColList().size() != fsOp.getSchema().getSignature().size()) {
// Some columns in select are pruned. This may happen if those are constants.
// TODO: the best solution is to hook the operator before fs with the select operator.
// See smb_mapjoin_20.q for more details.
return null;
}
// Only columns can be selected for both sorted and bucketed positions
for (int pos : bucketPositions) {
if (pos >= selectDesc.getColList().size()) {
// e.g., INSERT OVERWRITE TABLE temp1 SELECT c0, c0 FROM temp2;
// In such a case Select Op will only have one instance of c0 and RS would have two.
// So, locating bucketCol in such cases will generate error. So, bail out.
return null;
}
ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
if (!(selectColList instanceof ExprNodeColumnDesc)) {
return null;
}
sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
}
for (int pos : sortPositions) {
if (pos >= selectDesc.getColList().size()) {
return null;
}
ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
if (!(selectColList instanceof ExprNodeColumnDesc)) {
return null;
}
sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
}
useBucketSortPositions = false;
}
op = op.getParentOperators().get(0);
}
}
}