in ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java [2027:2320]
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
long newNumRows = 0;
CommonJoinOperator<? extends JoinDesc> jop = (CommonJoinOperator<? extends JoinDesc>) nd;
List<Operator<? extends OperatorDesc>> parents = jop.getParentOperators();
int numAttr = 1;
AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
HiveConf conf = aspCtx.getConf();
boolean allSatisfyPreCondition = true;
if (!isAllParentsContainStatistics(jop)) {
return null;
}
for (Operator<? extends OperatorDesc> op : parents) {
if (!satisfyPrecondition(op.getStatistics())) {
allSatisfyPreCondition = false;
break;
}
}
// there could be case where join operators input are not RS
// Since following estimation of statistics relies on join operators having it inputs as
// reduced sink it will not work for such cases. So we should not try to estimate stats
if (allSatisfyPreCondition) {
for (int pos = 0; pos < parents.size(); pos++) {
if (!(jop.getParentOperators().get(pos) instanceof ReduceSinkOperator)) {
allSatisfyPreCondition = false;
break;
}
}
}
if (allSatisfyPreCondition) {
// statistics object that is combination of statistics from all
// relations involved in JOIN
Statistics stats = new Statistics();
int numParent = parents.size();
Map<Integer, Long> rowCountParents = Maps.newHashMap();
Map<Integer, Statistics> joinStats = Maps.newHashMap();
Map<Integer, List<String>> joinKeys = Maps.newHashMap();
List<Long> rowCounts = Lists.newArrayList();
// detect if there are multiple attributes in join key
ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
List<String> keyExprs = StatsUtils.getQualifedReducerKeyNames(rsOp.getConf()
.getOutputKeyColumnNames());
numAttr = keyExprs.size();
// infer PK-FK relationship in single attribute join case
long inferredRowCount = inferPKFKRelationship(numAttr, parents, jop);
// get the join keys from parent ReduceSink operators
for (int pos = 0; pos < parents.size(); pos++) {
ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
Statistics parentStats;
parentStats = parent.getStatistics().clone();
keyExprs = StatsUtils.getQualifedReducerKeyNames(parent.getConf()
.getOutputKeyColumnNames());
rowCountParents.put(pos, parentStats.getNumRows());
rowCounts.add(parentStats.getNumRows());
// internal name for expressions and estimate column statistics for expression.
joinKeys.put(pos, keyExprs);
// get column statistics for all output columns
joinStats.put(pos, parentStats);
// since new statistics is derived from all relations involved in
// JOIN, we need to update the state information accordingly
stats.updateColumnStatsState(parentStats.getColumnStatsState());
}
if (numAttr == 0) {
// It is a cartesian product, row count is easy to infer
inferredRowCount = 1;
for (int pos = 0; pos < parents.size(); pos++) {
inferredRowCount = StatsUtils.safeMult(joinStats.get(pos).getNumRows(), inferredRowCount);
}
}
List<Long> distinctVals = Lists.newArrayList();
// these ndvs are later used to compute unmatched rows and num of nulls for outer joins
List<Long> ndvsUnmatched= Lists.newArrayList();
long denom = 1;
long distinctUnmatched = 1;
if (inferredRowCount == -1) {
// failed to infer PK-FK relationship for row count estimation fall-back on default logic
// compute denominator max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2))
// in case of multi-attribute join
List<Long> perAttrDVs = Lists.newArrayList();
// go over each predicate
for (int idx = 0; idx < numAttr; idx++) {
for (Integer i : joinKeys.keySet()) {
String col = joinKeys.get(i).get(idx);
ColStatistics cs = joinStats.get(i).getColumnStatisticsFromColName(col);
if (cs != null) {
perAttrDVs.add(cs.getCountDistint());
}
}
distinctVals.add(getDenominator(perAttrDVs));
ndvsUnmatched.add(getDenominatorForUnmatchedRows(perAttrDVs));
perAttrDVs.clear();
}
if (numAttr > 1 && conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_CORRELATED_MULTI_KEY_JOINS)) {
denom = Collections.max(distinctVals);
distinctUnmatched = denom - ndvsUnmatched.get(distinctVals.indexOf(denom));
} else {
// To avoid denominator getting larger and aggressively reducing
// number of rows, we will ease out denominator.
denom = StatsUtils.addWithExpDecay(distinctVals);
distinctUnmatched = denom - StatsUtils.addWithExpDecay(ndvsUnmatched);
}
}
// Update NDV of joined columns to be min(V(R,y), V(S,y))
updateJoinColumnsNDV(joinKeys, joinStats, numAttr);
// column statistics from different sources are put together and
// rename based on output schema of join operator
Map<String, ExprNodeDesc> colExprMap = jop.getColumnExprMap();
RowSchema rs = jop.getSchema();
List<ColStatistics> outColStats = Lists.newArrayList();
for (ColumnInfo ci : rs.getSignature()) {
String key = ci.getInternalName();
ExprNodeDesc end = colExprMap.get(key);
if (end instanceof ExprNodeColumnDesc) {
aspCtx.addAffectedColumn((ExprNodeColumnDesc) end);
String colName = ((ExprNodeColumnDesc) end).getColumn();
int pos = jop.getConf().getReversedExprs().get(key);
ColStatistics cs = joinStats.get(pos).getColumnStatisticsFromColName(colName);
String outColName = key;
if (cs != null) {
cs.setColumnName(outColName);
}
outColStats.add(cs);
}
}
// update join statistics
stats.setColumnStats(outColStats);
long joinRowCount;
long leftUnmatchedRows = 0L;
long rightUnmatchedRows = 0L;
if (inferredRowCount != -1) {
joinRowCount = inferredRowCount;
} else {
long innerJoinRowCount = computeRowCountAssumingInnerJoin(rowCounts, denom, jop);
// the idea is to measure unmatched rows in outer joins by figuring out how many rows didn't match
if (jop.getConf().getConds().length == 1) {
// TODO: Consider more than one condition
JoinCondDesc joinCond = jop.getConf().getConds()[0];
if (joinCond.getType() == JoinDesc.LEFT_OUTER_JOIN) {
leftUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(0), joinKeys.get(0), joinStats.get(0), distinctUnmatched);
} else if (joinCond.getType() == JoinDesc.RIGHT_OUTER_JOIN) {
rightUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(1), joinKeys.get(1), joinStats.get(1), distinctUnmatched);
} else if (joinCond.getType() == JoinDesc.FULL_OUTER_JOIN) {
leftUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(0), joinKeys.get(0), joinStats.get(0), distinctUnmatched);
rightUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(1), joinKeys.get(1), joinStats.get(1), distinctUnmatched);
}
}
// final row computation will consider join type
joinRowCount = computeFinalRowCount(rowCounts, StatsUtils.safeAdd(innerJoinRowCount, StatsUtils.safeAdd(leftUnmatchedRows, rightUnmatchedRows)), jop);
}
// update column statistics
updateColStats(conf, stats, leftUnmatchedRows, rightUnmatchedRows, joinRowCount, jop, rowCountParents);
// evaluate filter expression and update statistics
if (joinRowCount != -1 && jop.getConf().getNoOuterJoin() &&
jop.getConf().getResidualFilterExprs() != null &&
!jop.getConf().getResidualFilterExprs().isEmpty()) {
ExprNodeDesc pred;
if (jop.getConf().getResidualFilterExprs().size() > 1) {
pred = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
FunctionRegistry.getGenericUDFForAnd(),
jop.getConf().getResidualFilterExprs());
} else {
pred = jop.getConf().getResidualFilterExprs().get(0);
}
// evaluate filter expression and update statistics
newNumRows = evaluateExpression(stats, pred,
aspCtx, jop.getSchema().getColumnNames(), jop, stats.getNumRows());
// update statistics based on column statistics.
// OR conditions keeps adding the stats independently, this may
// result in number of rows getting more than the input rows in
// which case stats need not be updated
if (newNumRows <= joinRowCount) {
StatsUtils.updateStats(stats, newNumRows, true, jop);
}
}
stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, jop);
jop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
}
} else {
// worst case when there are no column statistics
float joinFactor = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_JOIN_FACTOR);
int numParents = parents.size();
long crossRowCount = 1;
long crossDataSize = 1;
long maxRowCount = 0;
long maxDataSize = 0;
State statsState = State.NONE;
for (Operator<? extends OperatorDesc> op : parents) {
Statistics ps = op.getStatistics();
statsState = Statistics.inferColumnStatsState(statsState, ps.getBasicStatsState());
long rowCount = ps.getNumRows();
long dataSize = ps.getDataSize();
// Update cross size
long newCrossRowCount = StatsUtils.safeMult(crossRowCount, rowCount);
long newCrossDataSize = StatsUtils.safeAdd(
StatsUtils.safeMult(crossDataSize, rowCount),
StatsUtils.safeMult(dataSize, crossRowCount));
crossRowCount = newCrossRowCount;
crossDataSize = newCrossDataSize;
// Update largest relation
if (rowCount > maxRowCount) {
maxRowCount = rowCount;
maxDataSize = dataSize;
}
}
long newDataSize;
// detect if there are attributes in join key
boolean cartesianProduct = false;
if (jop.getParentOperators().get(0) instanceof ReduceSinkOperator) {
ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
List<String> keyExprs = StatsUtils.getQualifedReducerKeyNames(rsOp.getConf()
.getOutputKeyColumnNames());
cartesianProduct = keyExprs.size() == 0;
} else if (jop instanceof AbstractMapJoinOperator) {
AbstractMapJoinOperator<? extends MapJoinDesc> mjop =
(AbstractMapJoinOperator<? extends MapJoinDesc>) jop;
List<ExprNodeDesc> keyExprs = mjop.getConf().getKeys().values().iterator().next();
cartesianProduct = keyExprs.size() == 0;
}
if (cartesianProduct) {
// Cartesian product
newNumRows = crossRowCount;
newDataSize = crossDataSize;
} else {
if (numParents > 1) {
newNumRows = StatsUtils.safeMult(StatsUtils.safeMult(maxRowCount, (numParents - 1)), joinFactor);
newDataSize = StatsUtils.safeMult(StatsUtils.safeMult(maxDataSize, (numParents - 1)), joinFactor);
} else {
// MUX operator with 1 parent
newNumRows = StatsUtils.safeMult(maxRowCount, joinFactor);
newDataSize = StatsUtils.safeMult(maxDataSize, joinFactor);
}
}
Statistics wcStats = new Statistics(newNumRows, newDataSize, 0, 0);
wcStats.setBasicStatsState(statsState);
// evaluate filter expression and update statistics
if (jop.getConf().getNoOuterJoin() &&
jop.getConf().getResidualFilterExprs() != null &&
!jop.getConf().getResidualFilterExprs().isEmpty()) {
long joinRowCount = newNumRows;
ExprNodeDesc pred;
if (jop.getConf().getResidualFilterExprs().size() > 1) {
pred = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
FunctionRegistry.getGenericUDFForAnd(),
jop.getConf().getResidualFilterExprs());
} else {
pred = jop.getConf().getResidualFilterExprs().get(0);
}
// evaluate filter expression and update statistics
newNumRows = evaluateExpression(wcStats, pred,
aspCtx, jop.getSchema().getColumnNames(), jop, wcStats.getNumRows());
// update only the basic statistics in the absence of column statistics
if (newNumRows <= joinRowCount) {
StatsUtils.updateStats(wcStats, newNumRows, false, jop);
}
}
wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, jop);
jop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
}
return null;
}