in ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java [254:896]
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
// 1. Do few checks to determine eligibility of optimization
// 2. look at ExprNodeFuncGenericDesc in select list to see if its min, max, count etc.
// If it is
// 3. Connect to metastore and get the stats
// 4. Compose rows and add it in FetchWork
// 5. Delete GBY - RS - GBY - SEL from the pipeline.
StatsOptimizerProcContext soProcCtx = (StatsOptimizerProcContext) procCtx;
// If the optimization has been stopped for the reasons like being not qualified,
// or lack of the stats data. we do not continue this process. For an example,
// for a query select max(value) from src1 union all select max(value) from src2
// if it has been union remove optimized, the AST tree will become
// TS[0]->SEL[1]->GBY[2]-RS[3]->GBY[4]->FS[17]
// TS[6]->SEL[7]->GBY[8]-RS[9]->GBY[10]->FS[18]
// if TS[0] branch for src1 is not optimized because src1 does not have column stats
// there is no need to continue processing TS[6] branch
if (soProcCtx.stopProcess) {
return null;
}
boolean isOptimized = false;
try {
TableScanOperator tsOp = (TableScanOperator) stack.get(0);
if (tsOp.getNumParent() > 0) {
// looks like a subq plan.
return null;
}
if (tsOp.getConf().getRowLimit() != -1) {
// table is sampled. In some situation, we really can leverage row
// limit. In order to be safe, we do not use it now.
return null;
}
Hive hive = Hive.get(pctx.getConf());
Table tbl = tsOp.getConf().getTableMetadata();
boolean isTransactionalTable = AcidUtils.isTransactionalTable(tbl);
// If the table is transactional, get stats state by calling getTable() with
// transactional flag on to check the validity of table stats.
if (isTransactionalTable) {
tbl = hive.getTable(tbl.getDbName(), tbl.getTableName(), true, true);
}
if (!StatsUtils.checkCanProvideStats(tbl)) {
Logger.info("Table " + tbl.getTableName() + " is external and also could not provide statistics. " +
"Skip StatsOptimizer.");
return null;
}
if (tbl.isNonNative() && !tbl.getStorageHandler().canProvideBasicStatistics()) {
Logger.info("Table " + tbl.getTableName() + " is non Native table. Skip StatsOptimizer.");
return null;
}
Long rowCnt = getRowCnt(tsOp, tbl);
// if we can not have correct table stats, then both the table stats and column stats are not useful.
if (rowCnt == null) {
return null;
}
SelectOperator pselOp = (SelectOperator)stack.get(1);
for(ExprNodeDesc desc : pselOp.getConf().getColList()) {
if (!((desc instanceof ExprNodeColumnDesc) || (desc instanceof ExprNodeConstantDesc))) {
// Probably an expression, cant handle that
return null;
}
}
Map<String, ExprNodeDesc> exprMap = pselOp.getColumnExprMap();
// Since we have done an exact match on TS-SEL-GBY-RS-GBY-(SEL)-FS
// we need not to do any instanceof checks for following.
GroupByOperator pgbyOp = (GroupByOperator)stack.get(2);
if (getGbyKeyType(pgbyOp) == GbyKeyType.OTHER) {
return null;
}
// we already check if rowCnt is null and rowCnt==0 means table is
// empty.
else if (getGbyKeyType(pgbyOp) == GbyKeyType.CONSTANT && rowCnt == 0) {
return null;
}
ReduceSinkOperator rsOp = (ReduceSinkOperator)stack.get(3);
if (rsOp.getConf().getDistinctColumnIndices().size() > 0) {
// we can't handle distinct
return null;
}
GroupByOperator cgbyOp = (GroupByOperator)stack.get(4);
if (getGbyKeyType(cgbyOp) == GbyKeyType.OTHER) {
return null;
}
// we already check if rowCnt is null and rowCnt==0 means table is
// empty.
else if (getGbyKeyType(cgbyOp) == GbyKeyType.CONSTANT && rowCnt == 0) {
return null;
}
Operator<?> last = (Operator<?>) stack.get(5);
SelectOperator cselOp = null;
Map<Integer,Object> posToConstant = new LinkedHashMap<>();
if (last instanceof SelectOperator) {
cselOp = (SelectOperator) last;
if (!cselOp.isIdentitySelect()) {
for (int pos = 0; pos < cselOp.getConf().getColList().size(); pos++) {
ExprNodeDesc desc = cselOp.getConf().getColList().get(pos);
if (desc instanceof ExprNodeConstantDesc) {
//We store the position to the constant value for later use.
posToConstant.put(pos, ((ExprNodeConstantDesc)desc).getValue());
} else {
if (!(desc instanceof ExprNodeColumnDesc)) {
// Probably an expression, cant handle that
return null;
}
}
}
}
last = (Operator<?>) stack.get(6);
} else {
// Add constants if there is no SELECT on top
GroupByDesc gbyDesc = cgbyOp.getConf();
int numCols = gbyDesc.getOutputColumnNames().size();
int aggCols = gbyDesc.getAggregators().size();
List<String> dpCols = cgbyOp.getSchema().getColumnNames().subList(0, numCols - aggCols);
for(int i = 0; i < dpCols.size(); i++) {
ExprNodeDesc end = ExprNodeDescUtils.findConstantExprOrigin(dpCols.get(i), cgbyOp);
assert end instanceof ExprNodeConstantDesc;
posToConstant.put(i, ((ExprNodeConstantDesc)end).getValue());
}
}
FileSinkOperator fsOp = (FileSinkOperator)last;
if (fsOp.getNumChild() > 0) {
// looks like a subq plan.
return null; // todo we can collapse this part of tree into single TS
}
List<Object> oneRow = new ArrayList<Object>();
AcidUtils.TableSnapshot tableSnapshot =
AcidUtils.getTableSnapshot(pctx.getConf(), tbl);
for (AggregationDesc aggr : pgbyOp.getConf().getAggregators()) {
if (aggr.getDistinct()) {
// our stats for NDV is approx, not accurate.
return null;
}
// Get the aggregate function matching the name in the query.
GenericUDAFResolver udaf =
FunctionRegistry.getGenericUDAFResolver(aggr.getGenericUDAFName());
if (udaf instanceof GenericUDAFSum) {
// long/double/decimal
ExprNodeDesc desc = aggr.getParameters().get(0);
// add null for SUM(1), when the table is empty. Without this, category = LONG, and the result is 0
// instead of NULL.
if (desc instanceof ExprNodeConstantDesc && rowCnt == 0) {
oneRow.add(null);
continue;
}
PrimitiveCategory category = GenericUDAFSum.getReturnType(desc.getTypeInfo());
if (category == null) {
return null;
}
String constant;
if (desc instanceof ExprNodeConstantDesc) {
constant = ((ExprNodeConstantDesc) desc).getValue().toString();
} else if (desc instanceof ExprNodeColumnDesc && exprMap.get(((ExprNodeColumnDesc)desc).getColumn()) instanceof ExprNodeConstantDesc) {
constant = ((ExprNodeConstantDesc)exprMap.get(((ExprNodeColumnDesc)desc).getColumn())).getValue().toString();
} else {
return null;
}
switch (category) {
case LONG:
oneRow.add(Long.parseLong(constant) * rowCnt);
break;
case DOUBLE:
oneRow.add(Double.parseDouble(constant) * rowCnt);
break;
case DECIMAL:
oneRow.add(HiveDecimal.create(constant).multiply(HiveDecimal.create(rowCnt)));
break;
default:
throw new IllegalStateException("never");
}
}
else if (udaf instanceof GenericUDAFCount) {
// always long
rowCnt = 0L;
if (aggr.getParameters().isEmpty()) {
// Its either count (*) or count() case
rowCnt = getRowCnt(tsOp, tbl);
if (rowCnt == null) {
return null;
}
} else if (aggr.getParameters().get(0) instanceof ExprNodeConstantDesc) {
if (((ExprNodeConstantDesc) aggr.getParameters().get(0)).getValue() != null) {
// count (1)
rowCnt = getRowCnt(tsOp, tbl);
if (rowCnt == null) {
return null;
}
}
// otherwise it is count(null), should directly return 0.
} else if ((aggr.getParameters().get(0) instanceof ExprNodeColumnDesc)
&& exprMap.get(((ExprNodeColumnDesc) aggr.getParameters().get(0)).getColumn()) instanceof ExprNodeConstantDesc) {
if (((ExprNodeConstantDesc) (exprMap.get(((ExprNodeColumnDesc) aggr.getParameters()
.get(0)).getColumn()))).getValue() != null) {
rowCnt = getRowCnt(tsOp, tbl);
if (rowCnt == null) {
return null;
}
}
} else {
// Its count(col) case
ExprNodeColumnDesc desc = (ExprNodeColumnDesc) exprMap.get(((ExprNodeColumnDesc) aggr
.getParameters().get(0)).getColumn());
String colName = desc.getColumn();
StatType type = getType(desc.getTypeString());
if (!tbl.isPartitioned()) {
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) {
Logger.debug("Stats for table : " + tbl.getTableName() + " are not up to date.");
return null;
}
rowCnt = Long.valueOf(tbl.getProperty(StatsSetupConst.ROW_COUNT));
if (!StatsUtils.areColumnStatsUptoDateForQueryAnswering(tbl, tbl.getParameters(), colName)) {
Logger.debug("Stats for table : " + tbl.getTableName() + " column " + colName
+ " are not up to date.");
return null;
}
List<ColumnStatisticsObj> stats =
hive.getMSC().getTableColumnStatistics(
tbl.getDbName(), tbl.getTableName(),
Lists.newArrayList(colName),
Constants.HIVE_ENGINE, tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
if (stats.isEmpty()) {
Logger.debug("No stats for " + tbl.getTableName() + " column " + colName);
return null;
}
Long nullCnt = getNullcountFor(type, stats.get(0).getStatsData());
if (null == nullCnt) {
Logger.debug("Unsupported type: " + desc.getTypeString() + " encountered in "
+ "metadata optimizer for column : " + colName);
return null;
} else {
rowCnt -= nullCnt;
}
} else {
Set<Partition> parts = pctx.getPrunedPartitions(tsOp.getConf().getAlias(), tsOp)
.getPartitions();
for (Partition part : parts) {
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(part.getTable(), part.getParameters())) {
Logger.debug("Stats for part : " + part.getSpec() + " are not up to date.");
return null;
}
long partRowCnt = Long.parseLong(part.getParameters().get(
StatsSetupConst.ROW_COUNT));
rowCnt += partRowCnt;
}
Collection<List<ColumnStatisticsObj>> result = verifyAndGetPartColumnStats(hive,
tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null)
return null;
Long nullCnt = getNullcountFor(type, statData);
if (nullCnt == null) {
Logger.debug("Unsupported type: " + desc.getTypeString() + " encountered in "
+ "metadata optimizer for column : " + colName);
return null;
} else {
rowCnt -= nullCnt;
}
}
}
}
oneRow.add(rowCnt);
} else if (udaf instanceof GenericUDAFMax) {
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
if(!tbl.isPartitioned()) {
if (!StatsUtils.areColumnStatsUptoDateForQueryAnswering(tbl, tbl.getParameters(), colName)) {
Logger.debug("Stats for table : " + tbl.getTableName() + " column " + colName
+ " are not up to date.");
return null;
}
List<ColumnStatisticsObj> stats =
hive.getMSC().getTableColumnStatistics(
tbl.getDbName(), tbl.getTableName(),
Lists.newArrayList(colName),
Constants.HIVE_ENGINE, tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
if (stats.isEmpty()) {
Logger.debug("No stats for " + tbl.getTableName() + " column " + colName);
return null;
}
ColumnStatisticsData statData = stats.get(0).getStatsData();
String name = colDesc.getTypeString().toUpperCase();
switch (type) {
case Integer: {
LongSubType subType = LongSubType.valueOf(name);
LongColumnStatsData lstats = statData.getLongStats();
if (lstats.isSetHighValue()) {
oneRow.add(subType.cast(lstats.getHighValue()));
} else {
oneRow.add(null);
}
break;
}
case Double: {
DoubleSubType subType = DoubleSubType.valueOf(name);
DoubleColumnStatsData dstats = statData.getDoubleStats();
if (dstats.isSetHighValue()) {
oneRow.add(subType.cast(dstats.getHighValue()));
} else {
oneRow.add(null);
}
break;
}
case Date: {
DateColumnStatsData dstats = statData.getDateStats();
if (dstats.isSetHighValue()) {
oneRow.add(DateSubType.DAYS.cast(dstats.getHighValue().getDaysSinceEpoch()));
} else {
oneRow.add(null);
}
break;
}
default:
// unsupported type
Logger.debug("Unsupported type: " + colDesc.getTypeString() + " encountered in " +
"metadata optimizer for column : " + colName);
return null;
}
} else {
Set<Partition> parts = pctx.getPrunedPartitions(
tsOp.getConf().getAlias(), tsOp).getPartitions();
String name = colDesc.getTypeString().toUpperCase();
switch (type) {
case Integer: {
LongSubType subType = LongSubType.valueOf(name);
Long maxVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
LongColumnStatsData lstats = statData.getLongStats();
if (!lstats.isSetHighValue()) {
continue;
}
long curVal = lstats.getHighValue();
maxVal = maxVal == null ? curVal : Math.max(maxVal, curVal);
}
if (maxVal != null) {
oneRow.add(subType.cast(maxVal));
} else {
oneRow.add(maxVal);
}
break;
}
case Double: {
DoubleSubType subType = DoubleSubType.valueOf(name);
Double maxVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
DoubleColumnStatsData dstats = statData.getDoubleStats();
if (!dstats.isSetHighValue()) {
continue;
}
double curVal = statData.getDoubleStats().getHighValue();
maxVal = maxVal == null ? curVal : Math.max(maxVal, curVal);
}
if (maxVal != null) {
oneRow.add(subType.cast(maxVal));
} else {
oneRow.add(null);
}
break;
}
case Date: {
Long maxVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
DateColumnStatsData dstats = statData.getDateStats();
if (!dstats.isSetHighValue()) {
continue;
}
long curVal = dstats.getHighValue().getDaysSinceEpoch();
maxVal = maxVal == null ? curVal : Math.max(maxVal, curVal);
}
if (maxVal != null) {
oneRow.add(DateSubType.DAYS.cast(maxVal));
} else {
oneRow.add(null);
}
break;
}
default:
Logger.debug("Unsupported type: " + colDesc.getTypeString() + " encountered in " +
"metadata optimizer for column : " + colName);
return null;
}
}
} else if (udaf instanceof GenericUDAFMin) {
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
if (!tbl.isPartitioned()) {
if (!StatsUtils.areColumnStatsUptoDateForQueryAnswering(tbl, tbl.getParameters(), colName)) {
Logger.debug("Stats for table : " + tbl.getTableName() + " column " + colName
+ " are not up to date.");
return null;
}
ColumnStatisticsData statData =
hive.getMSC().getTableColumnStatistics(
tbl.getDbName(), tbl.getTableName(), Lists.newArrayList(colName),
Constants.HIVE_ENGINE, tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null)
.get(0).getStatsData();
String name = colDesc.getTypeString().toUpperCase();
switch (type) {
case Integer: {
LongSubType subType = LongSubType.valueOf(name);
LongColumnStatsData lstats = statData.getLongStats();
if (lstats.isSetLowValue()) {
oneRow.add(subType.cast(lstats.getLowValue()));
} else {
oneRow.add(null);
}
break;
}
case Double: {
DoubleSubType subType = DoubleSubType.valueOf(name);
DoubleColumnStatsData dstats = statData.getDoubleStats();
if (dstats.isSetLowValue()) {
oneRow.add(subType.cast(dstats.getLowValue()));
} else {
oneRow.add(null);
}
break;
}
case Date: {
DateColumnStatsData dstats = statData.getDateStats();
if (dstats.isSetLowValue()) {
oneRow.add(DateSubType.DAYS.cast(dstats.getLowValue().getDaysSinceEpoch()));
} else {
oneRow.add(null);
}
break;
}
default: // unsupported type
Logger.debug("Unsupported type: " + colDesc.getTypeString() + " encountered in " +
"metadata optimizer for column : " + colName);
return null;
}
} else {
Set<Partition> parts = pctx.getPrunedPartitions(tsOp.getConf().getAlias(), tsOp).getPartitions();
String name = colDesc.getTypeString().toUpperCase();
switch(type) {
case Integer: {
LongSubType subType = LongSubType.valueOf(name);
Long minVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
LongColumnStatsData lstats = statData.getLongStats();
if (!lstats.isSetLowValue()) {
continue;
}
long curVal = lstats.getLowValue();
minVal = minVal == null ? curVal : Math.min(minVal, curVal);
}
if (minVal != null) {
oneRow.add(subType.cast(minVal));
} else {
oneRow.add(minVal);
}
break;
}
case Double: {
DoubleSubType subType = DoubleSubType.valueOf(name);
Double minVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
DoubleColumnStatsData dstats = statData.getDoubleStats();
if (!dstats.isSetLowValue()) {
continue;
}
double curVal = statData.getDoubleStats().getLowValue();
minVal = minVal == null ? curVal : Math.min(minVal, curVal);
}
if (minVal != null) {
oneRow.add(subType.cast(minVal));
} else {
oneRow.add(minVal);
}
break;
}
case Date: {
Long minVal = null;
Collection<List<ColumnStatisticsObj>> result =
verifyAndGetPartColumnStats(hive, tbl, colName, parts);
if (result == null) {
return null; // logging inside
}
for (List<ColumnStatisticsObj> statObj : result) {
ColumnStatisticsData statData = validateSingleColStat(statObj);
if (statData == null) return null;
DateColumnStatsData dstats = statData.getDateStats();
if (!dstats.isSetLowValue()) {
continue;
}
long curVal = dstats.getLowValue().getDaysSinceEpoch();
minVal = minVal == null ? curVal : Math.min(minVal, curVal);
}
if (minVal != null) {
oneRow.add(DateSubType.DAYS.cast(minVal));
} else {
oneRow.add(null);
}
break;
}
default: // unsupported type
Logger.debug("Unsupported type: " + colDesc.getTypeString() + " encountered in " +
"metadata optimizer for column : " + colName);
return null;
}
}
} else { // Unsupported aggregation.
Logger.debug("Unsupported aggregation for metadata optimizer: "
+ aggr.getGenericUDAFName());
return null;
}
}
List<List<Object>> allRows = new ArrayList<List<Object>>();
List<String> colNames = new ArrayList<String>();
List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
if (cselOp == null) {
List<Object> oneRowWithConstant = new ArrayList<>();
oneRowWithConstant.addAll(posToConstant.values());
oneRowWithConstant.addAll(oneRow);
allRows.add(oneRowWithConstant);
for (ColumnInfo colInfo : cgbyOp.getSchema().getSignature()) {
colNames.add(colInfo.getInternalName());
ois.add(TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(colInfo.getType()));
}
} else {
// in return path, we may have aggr($f0), aggr($f1) in GBY
// and then select aggr($f1), aggr($f0) in SEL.
// Thus we need to use colExp to find out which position is
// corresponding to which position.
Map<String, Integer> nameToIndex = new HashMap<>();
for (int index = 0; index < cgbyOp.getConf().getOutputColumnNames().size(); index++) {
nameToIndex.put(cgbyOp.getConf().getOutputColumnNames().get(index), index);
}
List<String> outputColumnNames = cselOp.getConf().getOutputColumnNames();
Map<Integer, Integer> cselOpTocgbyOp = new HashMap<>();
for (int index = 0; index < outputColumnNames.size(); index++) {
if (!posToConstant.containsKey(index)) {
String outputColumnName = outputColumnNames.get(index);
ExprNodeColumnDesc exprColumnNodeDesc = (ExprNodeColumnDesc) cselOp
.getColumnExprMap().get(outputColumnName);
cselOpTocgbyOp.put(index, nameToIndex.get(exprColumnNodeDesc.getColumn()));
}
}
List<Object> oneRowWithConstant = new ArrayList<>();
for (int pos = 0; pos < cselOp.getSchema().getSignature().size(); pos++) {
if (posToConstant.containsKey(pos)) {
// This position is a constant.
oneRowWithConstant.add(posToConstant.get(pos));
} else {
// This position is an aggregation.
// As we store in oneRow only the aggregate results, we need to adjust to the correct position
// if there are keys in the GBy operator.
oneRowWithConstant.add(oneRow.get(cselOpTocgbyOp.get(pos) - cgbyOp.getConf().getKeys().size()));
}
ColumnInfo colInfo = cselOp.getSchema().getSignature().get(pos);
colNames.add(colInfo.getInternalName());
ois.add(TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(colInfo.getType()));
}
allRows.add(oneRowWithConstant);
}
FetchWork fWork = null;
FetchTask fTask = pctx.getFetchTask();
if (fTask != null) {
fWork = fTask.getWork();
fWork.getRowsComputedUsingStats().addAll(allRows);
} else {
StandardStructObjectInspector sOI = ObjectInspectorFactory.
getStandardStructObjectInspector(colNames, ois);
fWork = new FetchWork(allRows, sOI);
fTask = (FetchTask) TaskFactory.get(fWork);
pctx.setFetchTask(fTask);
}
fWork.setLimit(fWork.getRowsComputedUsingStats().size());
isOptimized = true;
return null;
} catch (Exception e) {
// this is best effort optimization, bail out in error conditions and
// try generate and execute slower plan
Logger.debug("Failed to optimize using metadata optimizer", e);
return null;
} finally {
// If StatOptimization is not applied for any reason, the FetchTask should still not have been set
if (!isOptimized) {
soProcCtx.stopProcess = true;
pctx.setFetchTask(null);
}
}
}