public Object process()

in ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java [2027:2320]


    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
        Object... nodeOutputs) throws SemanticException {
      long newNumRows = 0;
      CommonJoinOperator<? extends JoinDesc> jop = (CommonJoinOperator<? extends JoinDesc>) nd;
      List<Operator<? extends OperatorDesc>> parents = jop.getParentOperators();
      int numAttr = 1;
      AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
      HiveConf conf = aspCtx.getConf();
      boolean allSatisfyPreCondition = true;

      if (!isAllParentsContainStatistics(jop)) {
        return null;
      }

      for (Operator<? extends OperatorDesc> op : parents) {
        if (!satisfyPrecondition(op.getStatistics())) {
          allSatisfyPreCondition = false;
          break;
        }
      }
      // there could be case where join operators input are not RS
      // Since following estimation of statistics relies on join operators having it inputs as
      // reduced sink it will not work for such cases. So we should not try to estimate stats
      if (allSatisfyPreCondition) {
        for (int pos = 0; pos < parents.size(); pos++) {
          if (!(jop.getParentOperators().get(pos) instanceof ReduceSinkOperator)) {
            allSatisfyPreCondition = false;
            break;
          }
        }
      }

      if (allSatisfyPreCondition) {

        // statistics object that is combination of statistics from all
        // relations involved in JOIN
        Statistics stats = new Statistics();
        int numParent = parents.size();
        Map<Integer, Long> rowCountParents = Maps.newHashMap();
        Map<Integer, Statistics> joinStats = Maps.newHashMap();
        Map<Integer, List<String>> joinKeys = Maps.newHashMap();
        List<Long> rowCounts = Lists.newArrayList();

        // detect if there are multiple attributes in join key
        ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
        List<String> keyExprs = StatsUtils.getQualifedReducerKeyNames(rsOp.getConf()
            .getOutputKeyColumnNames());
        numAttr = keyExprs.size();

        // infer PK-FK relationship in single attribute join case
        long inferredRowCount = inferPKFKRelationship(numAttr, parents, jop);
        // get the join keys from parent ReduceSink operators
        for (int pos = 0; pos < parents.size(); pos++) {
          ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
          Statistics parentStats;
          parentStats = parent.getStatistics().clone();
          keyExprs = StatsUtils.getQualifedReducerKeyNames(parent.getConf()
              .getOutputKeyColumnNames());

          rowCountParents.put(pos, parentStats.getNumRows());
          rowCounts.add(parentStats.getNumRows());

          // internal name for expressions and estimate column statistics for expression.
          joinKeys.put(pos, keyExprs);

          // get column statistics for all output columns
          joinStats.put(pos, parentStats);

          // since new statistics is derived from all relations involved in
          // JOIN, we need to update the state information accordingly
          stats.updateColumnStatsState(parentStats.getColumnStatsState());
        }

        if (numAttr == 0) {
          // It is a cartesian product, row count is easy to infer
          inferredRowCount = 1;
          for (int pos = 0; pos < parents.size(); pos++) {
            inferredRowCount = StatsUtils.safeMult(joinStats.get(pos).getNumRows(), inferredRowCount);
          }
        }

        List<Long> distinctVals = Lists.newArrayList();

        // these ndvs are later used to compute unmatched rows and num of nulls for outer joins
        List<Long> ndvsUnmatched= Lists.newArrayList();
        long denom = 1;
        long distinctUnmatched = 1;
        if (inferredRowCount == -1) {
          // failed to infer PK-FK relationship for row count estimation fall-back on default logic
          // compute denominator  max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2))
          // in case of multi-attribute join
          List<Long> perAttrDVs = Lists.newArrayList();
          // go over each predicate
          for (int idx = 0; idx < numAttr; idx++) {
            for (Integer i : joinKeys.keySet()) {
              String col = joinKeys.get(i).get(idx);
              ColStatistics cs = joinStats.get(i).getColumnStatisticsFromColName(col);
              if (cs != null) {
                perAttrDVs.add(cs.getCountDistint());
              }
            }
            distinctVals.add(getDenominator(perAttrDVs));
            ndvsUnmatched.add(getDenominatorForUnmatchedRows(perAttrDVs));
            perAttrDVs.clear();
          }

          if (numAttr > 1 && conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_CORRELATED_MULTI_KEY_JOINS)) {
            denom = Collections.max(distinctVals);
            distinctUnmatched = denom - ndvsUnmatched.get(distinctVals.indexOf(denom));
          } else {
            // To avoid denominator getting larger and aggressively reducing
            // number of rows, we will ease out denominator.
            denom = StatsUtils.addWithExpDecay(distinctVals);
            distinctUnmatched = denom - StatsUtils.addWithExpDecay(ndvsUnmatched);
          }
        }

        // Update NDV of joined columns to be min(V(R,y), V(S,y))
        updateJoinColumnsNDV(joinKeys, joinStats, numAttr);

        // column statistics from different sources are put together and
        // rename based on output schema of join operator
        Map<String, ExprNodeDesc> colExprMap = jop.getColumnExprMap();
        RowSchema rs = jop.getSchema();
        List<ColStatistics> outColStats = Lists.newArrayList();
        for (ColumnInfo ci : rs.getSignature()) {
          String key = ci.getInternalName();
          ExprNodeDesc end = colExprMap.get(key);
          if (end instanceof ExprNodeColumnDesc) {
            aspCtx.addAffectedColumn((ExprNodeColumnDesc) end);
            String colName = ((ExprNodeColumnDesc) end).getColumn();
            int pos = jop.getConf().getReversedExprs().get(key);
            ColStatistics cs = joinStats.get(pos).getColumnStatisticsFromColName(colName);
            String outColName = key;
            if (cs != null) {
              cs.setColumnName(outColName);
            }
            outColStats.add(cs);
          }
        }

        // update join statistics
        stats.setColumnStats(outColStats);

        long joinRowCount;
        long leftUnmatchedRows = 0L;
        long rightUnmatchedRows = 0L;
        if (inferredRowCount != -1) {
          joinRowCount = inferredRowCount;
        } else {
          long innerJoinRowCount = computeRowCountAssumingInnerJoin(rowCounts, denom, jop);
          // the idea is to measure unmatched rows in outer joins by figuring out how many rows didn't match
          if (jop.getConf().getConds().length == 1) {
            // TODO: Consider more than one condition
            JoinCondDesc joinCond = jop.getConf().getConds()[0];
            if (joinCond.getType() == JoinDesc.LEFT_OUTER_JOIN) {
              leftUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(0), joinKeys.get(0), joinStats.get(0), distinctUnmatched);
            } else if (joinCond.getType() == JoinDesc.RIGHT_OUTER_JOIN) {
              rightUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(1), joinKeys.get(1), joinStats.get(1), distinctUnmatched);
            } else if (joinCond.getType() == JoinDesc.FULL_OUTER_JOIN) {
              leftUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(0), joinKeys.get(0), joinStats.get(0), distinctUnmatched);
              rightUnmatchedRows = calculateUnmatchedRowsForOuter(conf, rowCountParents.get(1), joinKeys.get(1), joinStats.get(1), distinctUnmatched);
            }
          }
          // final row computation will consider join type
          joinRowCount = computeFinalRowCount(rowCounts, StatsUtils.safeAdd(innerJoinRowCount, StatsUtils.safeAdd(leftUnmatchedRows, rightUnmatchedRows)), jop);
        }

        // update column statistics
        updateColStats(conf, stats, leftUnmatchedRows, rightUnmatchedRows, joinRowCount, jop, rowCountParents);

        // evaluate filter expression and update statistics
        if (joinRowCount != -1 && jop.getConf().getNoOuterJoin() &&
            jop.getConf().getResidualFilterExprs() != null &&
            !jop.getConf().getResidualFilterExprs().isEmpty()) {
          ExprNodeDesc pred;
          if (jop.getConf().getResidualFilterExprs().size() > 1) {
            pred = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
                FunctionRegistry.getGenericUDFForAnd(),
                jop.getConf().getResidualFilterExprs());
          } else {
            pred = jop.getConf().getResidualFilterExprs().get(0);
          }
          // evaluate filter expression and update statistics
          newNumRows = evaluateExpression(stats, pred,
              aspCtx, jop.getSchema().getColumnNames(), jop, stats.getNumRows());
          // update statistics based on column statistics.
          // OR conditions keeps adding the stats independently, this may
          // result in number of rows getting more than the input rows in
          // which case stats need not be updated
          if (newNumRows <= joinRowCount) {
            StatsUtils.updateStats(stats, newNumRows, true, jop);
          }
        }

        stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, jop);
        jop.setStatistics(stats);

        if (LOG.isDebugEnabled()) {
          LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
        }
      } else {

        // worst case when there are no column statistics
        float joinFactor = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_JOIN_FACTOR);
        int numParents = parents.size();
        long crossRowCount = 1;
        long crossDataSize = 1;
        long maxRowCount = 0;
        long maxDataSize = 0;
        State statsState = State.NONE;

        for (Operator<? extends OperatorDesc> op : parents) {
          Statistics ps = op.getStatistics();
          statsState = Statistics.inferColumnStatsState(statsState, ps.getBasicStatsState());
          long rowCount = ps.getNumRows();
          long dataSize = ps.getDataSize();
          // Update cross size
          long newCrossRowCount = StatsUtils.safeMult(crossRowCount, rowCount);
          long newCrossDataSize = StatsUtils.safeAdd(
              StatsUtils.safeMult(crossDataSize, rowCount),
              StatsUtils.safeMult(dataSize, crossRowCount));
          crossRowCount = newCrossRowCount;
          crossDataSize = newCrossDataSize;
          // Update largest relation
          if (rowCount > maxRowCount) {
            maxRowCount = rowCount;
            maxDataSize = dataSize;
          }
        }

        long newDataSize;
        // detect if there are attributes in join key
        boolean cartesianProduct = false;
        if (jop.getParentOperators().get(0) instanceof ReduceSinkOperator) {
          ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
          List<String> keyExprs = StatsUtils.getQualifedReducerKeyNames(rsOp.getConf()
              .getOutputKeyColumnNames());
          cartesianProduct = keyExprs.size() == 0;
        } else if (jop instanceof AbstractMapJoinOperator) {
          AbstractMapJoinOperator<? extends MapJoinDesc> mjop =
              (AbstractMapJoinOperator<? extends MapJoinDesc>) jop;
          List<ExprNodeDesc> keyExprs = mjop.getConf().getKeys().values().iterator().next();
          cartesianProduct = keyExprs.size() == 0;
        }
        if (cartesianProduct) {
          // Cartesian product
          newNumRows = crossRowCount;
          newDataSize = crossDataSize;
        } else {
          if (numParents > 1) {
            newNumRows = StatsUtils.safeMult(StatsUtils.safeMult(maxRowCount, (numParents - 1)), joinFactor);
            newDataSize = StatsUtils.safeMult(StatsUtils.safeMult(maxDataSize, (numParents - 1)), joinFactor);
          } else {
            // MUX operator with 1 parent
            newNumRows = StatsUtils.safeMult(maxRowCount, joinFactor);
            newDataSize = StatsUtils.safeMult(maxDataSize, joinFactor);
          }
        }

        Statistics wcStats = new Statistics(newNumRows, newDataSize, 0, 0);
        wcStats.setBasicStatsState(statsState);

        // evaluate filter expression and update statistics
        if (jop.getConf().getNoOuterJoin() &&
            jop.getConf().getResidualFilterExprs() != null &&
            !jop.getConf().getResidualFilterExprs().isEmpty()) {
          long joinRowCount = newNumRows;
          ExprNodeDesc pred;
          if (jop.getConf().getResidualFilterExprs().size() > 1) {
            pred = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
                FunctionRegistry.getGenericUDFForAnd(),
                jop.getConf().getResidualFilterExprs());
          } else {
            pred = jop.getConf().getResidualFilterExprs().get(0);
          }
          // evaluate filter expression and update statistics
          newNumRows = evaluateExpression(wcStats, pred,
              aspCtx, jop.getSchema().getColumnNames(), jop, wcStats.getNumRows());
          // update only the basic statistics in the absence of column statistics
          if (newNumRows <= joinRowCount) {
            StatsUtils.updateStats(wcStats, newNumRows, false, jop);
          }
        }

        wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, jop);
        jop.setStatistics(wcStats);

        if (LOG.isDebugEnabled()) {
          LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
        }
      }
      return null;
    }