private List aggrStatsUseDB()

in standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java [1956:2254]


  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName,
      String tableName, List<String> partNames, List<String> colNames, String engine,
      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
    // TODO: all the extrapolation logic should be moved out of this class,
    // only mechanical data retrieval should remain here.
    String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
        // The following data is used to compute a partitioned table's NDV based
        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
        // accurately derived from partition NDVs, because the domain of column value two partitions
        // can overlap. If there is no overlap then global NDV is just the sum
        // of partition NDVs (UpperBound). But if there is some overlay then
        // global NDV can be anywhere between sum of partition NDVs (no overlap)
        // and same as one of the partition NDV (domain of column value in all other
        // partitions is subset of the domain value in one of the partition)
        // (LowerBound).But under uniform distribution, we can roughly estimate the global
        // NDV by leveraging the min/max values.
        // And, we also guarantee that the estimation makes sense by comparing it to the
        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
        + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? ";
    String queryText = null;
    long start = 0;
    long end = 0;

    boolean doTrace = LOG.isDebugEnabled();
    ForwardQueryResult<?> fqr = null;
    // Check if the status of all the columns of all the partitions exists
    // Extrapolation is not needed.
    if (areAllPartsFound) {
      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
          + " and \"ENGINE\" = ? "
          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
      start = doTrace ? System.nanoTime() : 0;
      try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
        Object qResult = executeWithArray(query.getInnerQuery(),
            prepareParams(catName, dbName, tableName, partNames, colNames,
                engine), queryText);
        if (qResult == null) {
          return Collections.emptyList();
        }
        end = doTrace ? System.nanoTime() : 0;
        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
        List<ColumnStatisticsObj> colStats =
            new ArrayList<ColumnStatisticsObj>(list.size());
        for (Object[] row : list) {
          colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
              useDensityFunctionForNDVEstimation, ndvTuner));
          Deadline.checkTimeout();
        }
        return colStats;
      }
    } else {
      // Extrapolation is needed for some columns.
      // In this case, at least a column status for a partition is missing.
      // We need to extrapolate this partition based on the other partitions
      List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
      queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") "
          + " from " + PART_COL_STATS
          + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
          + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
          + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
          + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
          + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
          + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
          + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\"";
      start = doTrace ? System.nanoTime() : 0;
      List<String> noExtraColumnNames = new ArrayList<String>();
      Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
      try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
        Object qResult = executeWithArray(query.getInnerQuery(),
            prepareParams(catName, dbName, tableName, partNames, colNames,
                engine), queryText);
        end = doTrace ? System.nanoTime() : 0;
        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
        if (qResult == null) {
          return Collections.emptyList();
        }

        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
        for (Object[] row : list) {
          String colName = (String) row[0];
          String colType = (String) row[1];
          // Extrapolation is not needed for this column if
          // count(\"PARTITION_NAME\")==partNames.size()
          // Or, extrapolation is not possible for this column if
          // count(\"PARTITION_NAME\")<2
          Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
          if (count == partNames.size() || count < 2) {
            noExtraColumnNames.add(colName);
          } else {
            extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)});
          }
          Deadline.checkTimeout();
        }
      }
      // Extrapolation is not needed for columns noExtraColumnNames
      List<Object[]> list;
      if (noExtraColumnNames.size() != 0) {
        queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
            + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in ("
            + makeParams(partNames.size()) + ")"
            + " and \"ENGINE\" = ? "
            + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
        start = doTrace ? System.nanoTime() : 0;

        try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
          Object qResult = executeWithArray(query.getInnerQuery(),
              prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText);
          if (qResult == null) {
            return Collections.emptyList();
          }
          list = MetastoreDirectSqlUtils.ensureList(qResult);
          for (Object[] row : list) {
            colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
                useDensityFunctionForNDVEstimation, ndvTuner));
            Deadline.checkTimeout();
          }
          end = doTrace ? System.nanoTime() : 0;
          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
        }
      }
      // Extrapolation is needed for extraColumnNames.
      // give a sequence number for all the partitions
      if (extraColumnNameTypeParts.size() != 0) {
        Map<String, Integer> indexMap = new HashMap<String, Integer>();
        for (int index = 0; index < partNames.size(); index++) {
          indexMap.put(partNames.get(index), index);
        }
        // get sum for all columns to reduce the number of queries
        Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>();
        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
            + " from " + PART_COL_STATS
            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
            + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")"
            + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
            + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
            + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
        start = doTrace ? System.nanoTime() : 0;
        try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
          List<String> extraColumnNames = new ArrayList<String>();
          extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
          Object qResult = executeWithArray(query.getInnerQuery(),
              prepareParams(catName, dbName, tableName, partNames,
                  extraColumnNames, engine), queryText);
          if (qResult == null) {
            return Collections.emptyList();
          }
          list = MetastoreDirectSqlUtils.ensureList(qResult);
          // see the indexes for colstats in IExtrapolatePartStatus
          Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
          for (Object[] row : list) {
            Map<Integer, Object> indexToObject = new HashMap<Integer, Object>();
            for (int ind = 1; ind < row.length; ind++) {
              indexToObject.put(sumIndex[ind - 1], row[ind]);
            }
            // row[0] is the column name
            sumMap.put((String) row[0], indexToObject);
            Deadline.checkTimeout();
          }
          end = doTrace ? System.nanoTime() : 0;
          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
        }
        for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
          Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
          String colName = entry.getKey();
          String colType = entry.getValue()[0];
          Long sumVal = Long.parseLong(entry.getValue()[1]);
          // fill in colname
          row[0] = colName;
          // fill in coltype
          row[1] = colType;
          // use linear extrapolation. more complicated one can be added in the
          // future.
          IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus();
          // fill in colstatus
          Integer[] index = null;
          boolean decimal = false;
          if (colType.toLowerCase().startsWith("decimal")) {
            index = IExtrapolatePartStatus.indexMaps.get("decimal");
            decimal = true;
          } else {
            index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
          }
          // if the colType is not the known type, long, double, etc, then get
          // all index.
          if (index == null) {
            index = IExtrapolatePartStatus.indexMaps.get("default");
          }
          for (int colStatIndex : index) {
            String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex];
            // if the aggregation type is sum, we do a scale-up
            if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) {
              Object o = sumMap.get(colName).get(colStatIndex);
              if (o == null) {
                row[2 + colStatIndex] = null;
              } else {
                Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
                row[2 + colStatIndex] = val / sumVal * (partNames.size());
              }
            } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
                || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) {
              // if the aggregation type is min/max, we extrapolate from the
              // left/right borders
              if (!decimal) {
                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS
                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                    + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
                    + " order by \"" + colStatName + "\"";
              } else {
                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS
                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                    + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
                    + " order by cast(\"" + colStatName + "\" as decimal)";
              }
              start = doTrace ? System.nanoTime() : 0;
              try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
                Object qResult = executeWithArray(query.getInnerQuery(),
                    prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
                if (qResult == null) {
                  return Collections.emptyList();
                }
                fqr = (ForwardQueryResult<?>) qResult;
                Object[] min = (Object[]) (fqr.get(0));
                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
                end = doTrace ? System.nanoTime() : 0;
                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
                if (min[0] == null || max[0] == null) {
                  row[2 + colStatIndex] = null;
                } else {
                  row[2 + colStatIndex] = extrapolateMethod
                      .extrapolate(min, max, colStatIndex, indexMap);
                }
              }
            } else {
              // if the aggregation type is avg, we use the average on the existing ones.
              queryText = "select "
                  + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
                  + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
                  + " from " + PART_COL_STATS + ""
                  + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                  + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
                  + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
                  + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
                  + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
                  + " group by \"COLUMN_NAME\"";
              start = doTrace ? System.nanoTime() : 0;
              try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
                Object qResult = executeWithArray(query.getInnerQuery(),
                    prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
                if (qResult == null) {
                  return Collections.emptyList();
                }
                fqr = (ForwardQueryResult<?>) qResult;
                Object[] avg = (Object[]) (fqr.get(0));
                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
                // "AVG_DECIMAL"
                row[2 + colStatIndex] = avg[colStatIndex - 12];
                end = doTrace ? System.nanoTime() : 0;
                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
              }
            }
          }
          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
          Deadline.checkTimeout();
        }
      }
      return colStats;
    }
  }