in standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java [1956:2254]
private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName,
String tableName, List<String> partNames, List<String> colNames, String engine,
boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
// TODO: all the extrapolation logic should be moved out of this class,
// only mechanical data retrieval should remain here.
String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+ "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+ "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+ "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+ "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
// The following data is used to compute a partitioned table's NDV based
// on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
// accurately derived from partition NDVs, because the domain of column value two partitions
// can overlap. If there is no overlap then global NDV is just the sum
// of partition NDVs (UpperBound). But if there is some overlay then
// global NDV can be anywhere between sum of partition NDVs (no overlap)
// and same as one of the partition NDV (domain of column value in all other
// partitions is subset of the domain value in one of the partition)
// (LowerBound).But under uniform distribution, we can roughly estimate the global
// NDV by leveraging the min/max values.
// And, we also guarantee that the estimation makes sense by comparing it to the
// UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
// and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+ "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+ "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? ";
String queryText = null;
long start = 0;
long end = 0;
boolean doTrace = LOG.isDebugEnabled();
ForwardQueryResult<?> fqr = null;
// Check if the status of all the columns of all the partitions exists
// Extrapolation is not needed.
if (areAllPartsFound) {
queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames, colNames,
engine), queryText);
if (qResult == null) {
return Collections.emptyList();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
List<ColumnStatisticsObj> colStats =
new ArrayList<ColumnStatisticsObj>(list.size());
for (Object[] row : list) {
colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
return colStats;
}
} else {
// Extrapolation is needed for some columns.
// In this case, at least a column status for a partition is missing.
// We need to extrapolate this partition based on the other partitions
List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") "
+ " from " + PART_COL_STATS
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
List<String> noExtraColumnNames = new ArrayList<String>();
Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames, colNames,
engine), queryText);
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
if (qResult == null) {
return Collections.emptyList();
}
List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
for (Object[] row : list) {
String colName = (String) row[0];
String colType = (String) row[1];
// Extrapolation is not needed for this column if
// count(\"PARTITION_NAME\")==partNames.size()
// Or, extrapolation is not possible for this column if
// count(\"PARTITION_NAME\")<2
Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
if (count == partNames.size() || count < 2) {
noExtraColumnNames.add(colName);
} else {
extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)});
}
Deadline.checkTimeout();
}
}
// Extrapolation is not needed for columns noExtraColumnNames
List<Object[]> list;
if (noExtraColumnNames.size() != 0) {
queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
+ makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in ("
+ makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText);
if (qResult == null) {
return Collections.emptyList();
}
list = MetastoreDirectSqlUtils.ensureList(qResult);
for (Object[] row : list) {
colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
}
}
// Extrapolation is needed for extraColumnNames.
// give a sequence number for all the partitions
if (extraColumnNameTypeParts.size() != 0) {
Map<String, Integer> indexMap = new HashMap<String, Integer>();
for (int index = 0; index < partNames.size(); index++) {
indexMap.put(partNames.get(index), index);
}
// get sum for all columns to reduce the number of queries
Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>();
queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
+ " from " + PART_COL_STATS
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")"
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
List<String> extraColumnNames = new ArrayList<String>();
extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames,
extraColumnNames, engine), queryText);
if (qResult == null) {
return Collections.emptyList();
}
list = MetastoreDirectSqlUtils.ensureList(qResult);
// see the indexes for colstats in IExtrapolatePartStatus
Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
for (Object[] row : list) {
Map<Integer, Object> indexToObject = new HashMap<Integer, Object>();
for (int ind = 1; ind < row.length; ind++) {
indexToObject.put(sumIndex[ind - 1], row[ind]);
}
// row[0] is the column name
sumMap.put((String) row[0], indexToObject);
Deadline.checkTimeout();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
}
for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
String colName = entry.getKey();
String colType = entry.getValue()[0];
Long sumVal = Long.parseLong(entry.getValue()[1]);
// fill in colname
row[0] = colName;
// fill in coltype
row[1] = colType;
// use linear extrapolation. more complicated one can be added in the
// future.
IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus();
// fill in colstatus
Integer[] index = null;
boolean decimal = false;
if (colType.toLowerCase().startsWith("decimal")) {
index = IExtrapolatePartStatus.indexMaps.get("decimal");
decimal = true;
} else {
index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
}
// if the colType is not the known type, long, double, etc, then get
// all index.
if (index == null) {
index = IExtrapolatePartStatus.indexMaps.get("default");
}
for (int colStatIndex : index) {
String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex];
// if the aggregation type is sum, we do a scale-up
if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) {
Object o = sumMap.get(colName).get(colStatIndex);
if (o == null) {
row[2 + colStatIndex] = null;
} else {
Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
row[2 + colStatIndex] = val / sumVal * (partNames.size());
}
} else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
|| IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) {
// if the aggregation type is min/max, we extrapolate from the
// left/right borders
if (!decimal) {
queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ " order by \"" + colStatName + "\"";
} else {
queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ " order by cast(\"" + colStatName + "\" as decimal)";
}
start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
if (qResult == null) {
return Collections.emptyList();
}
fqr = (ForwardQueryResult<?>) qResult;
Object[] min = (Object[]) (fqr.get(0));
Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
if (min[0] == null || max[0] == null) {
row[2 + colStatIndex] = null;
} else {
row[2 + colStatIndex] = extrapolateMethod
.extrapolate(min, max, colStatIndex, indexMap);
}
}
} else {
// if the aggregation type is avg, we use the average on the existing ones.
queryText = "select "
+ "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
+ " from " + PART_COL_STATS + ""
+ " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
+ " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\"";
start = doTrace ? System.nanoTime() : 0;
try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
Object qResult = executeWithArray(query.getInnerQuery(),
prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
if (qResult == null) {
return Collections.emptyList();
}
fqr = (ForwardQueryResult<?>) qResult;
Object[] avg = (Object[]) (fqr.get(0));
// colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
// "AVG_DECIMAL"
row[2 + colStatIndex] = avg[colStatIndex - 12];
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
}
}
}
colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
}
return colStats;
}
}