in fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java [391:673]
public void analyze(Analyzer analyzer) throws AnalysisException {
// Resolve and analyze this table ref so we can evaluate partition predicates.
TableRef tableRef = new TableRef(tableName_.toPath(), null, Privilege.ALTER);
tableRef = analyzer.resolveTableRef(tableRef);
Preconditions.checkNotNull(tableRef);
tableRef.analyze(analyzer);
if (tableRef instanceof InlineViewRef) {
throw new AnalysisException(String.format(
"COMPUTE STATS not supported for view: %s", tableName_));
}
if (tableRef instanceof CollectionTableRef) {
throw new AnalysisException(String.format(
"COMPUTE STATS not supported for nested collection: %s", tableName_));
}
if (tableRef instanceof SystemTableRef) {
throw new AnalysisException(String.format(
"COMPUTE STATS not supported for system table: %s", tableName_));
}
table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT);
if (!(table_ instanceof FeFsTable)) {
if (partitionSet_ != null) {
throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " +
"for non-HDFS table " + tableName_);
}
isIncremental_ = false;
}
if (table_ instanceof FeIcebergTable) {
if (partitionSet_ != null) {
throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " +
"for Iceberg table " + tableName_);
}
isIncremental_ = false;
}
if (columnWhitelist_ != null) {
validatedColumnWhitelist_ = new HashSet<>();
for (String colName : columnWhitelist_) {
Column col = table_.getColumn(colName);
if (col == null) {
throw new AnalysisException(colName + " not found in table: " +
table_.getName());
}
if (table_ instanceof FeFsTable && table_.isClusteringColumn(col)) {
throw new AnalysisException("COMPUTE STATS not supported for partitioning " +
"column " + col.getName() + " of HDFS table.");
}
if (ignoreColumn(col)) {
throw new AnalysisException("COMPUTE STATS not supported for column " +
col.getName() + " of complex type:" + col.getType().toSql());
}
validatedColumnWhitelist_.add(col);
}
}
FeFsTable hdfsTable = null;
if (table_ instanceof FeFsTable) {
hdfsTable = (FeFsTable)table_;
if (hdfsTable.usesAvroSchemaOverride()) checkIncompleteAvroSchema(hdfsTable);
if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
partitionSet_ != null) {
throw new AnalysisException(String.format(
"Can't compute PARTITION stats on an unpartitioned table: %s",
tableName_));
} else if (partitionSet_ != null) {
Preconditions.checkState(tableRef instanceof BaseTableRef);
partitionSet_.setPartitionShouldExist();
partitionSet_.analyze(analyzer);
}
// For incremental stats, estimate the size of intermediate stats and report an
// error if the estimate is greater than --inc_stats_size_limit_bytes in bytes
if (isIncremental_) {
long numOfAllIncStatsPartitions = 0;
Collection<? extends FeFsPartition> allPartitions = hdfsTable.loadAllPartitions();
if (partitionSet_ == null) {
numOfAllIncStatsPartitions = allPartitions.size();
} else {
Set<Long> partIds =
Sets.newHashSetWithExpectedSize(partitionSet_.getPartitions().size());
for (FeFsPartition part: partitionSet_.getPartitions()) {
partIds.add(part.getId());
}
// incremental statistics size = Existing partition statistics
// - Repeated calculation partition stats
// + This time calculation partition stats
for (FeFsPartition part: allPartitions) {
// The partition has incremental stats, and the partition is not calculated
// this time. It is "Existing partition statistics
// - Repeated calculation partition stats"
if (part.hasIncrementalStats() && !partIds.contains(part.getId())) {
++numOfAllIncStatsPartitions;
}
}
// This time calculation partition stats
numOfAllIncStatsPartitions += partitionSet_.getPartitions().size();
}
long incStatMaxSize = BackendConfig.INSTANCE.getIncStatsMaxSize();
// The size of the existing stats and the stats to be calculated
long statsSizeEstimate = hdfsTable.getColumns().size() *
numOfAllIncStatsPartitions * HdfsTable.STATS_SIZE_PER_COLUMN_BYTES;
if (statsSizeEstimate > incStatMaxSize) {
LOG.error("Incremental stats size estimate for table " + hdfsTable.getName() +
" exceeded " + incStatMaxSize + ", estimate = "
+ statsSizeEstimate);
throw new AnalysisException("Incremental stats size estimate exceeds "
+ PrintUtils.printBytes(incStatMaxSize)
+ ". Please try COMPUTE STATS instead.");
}
}
}
// Build partition filters that only select partitions without valid statistics for
// incremental computation.
List<String> filterPreds = new ArrayList<>();
if (isIncremental_) {
if (partitionSet_ == null) {
// If any column does not have stats, we recompute statistics for all partitions
// TODO: need a better way to invalidate stats for all partitions, so that we can
// use this logic to only recompute new / changed columns.
boolean tableIsMissingColStats = false;
// We'll warn the user if a column is missing stats (and therefore we rescan the
// whole table), but if all columns are missing stats, the table just doesn't
// have any stats and there's no need to warn.
boolean allColumnsMissingStats = true;
String exampleColumnMissingStats = null;
// Partition columns always have stats, so exclude them from this search
for (Column col: table_.getNonClusteringColumns()) {
if (ignoreColumn(col)) continue;
if (!col.getStats().hasStats()) {
if (!tableIsMissingColStats) {
tableIsMissingColStats = true;
exampleColumnMissingStats = col.getName();
}
} else {
allColumnsMissingStats = false;
}
}
if (tableIsMissingColStats && !allColumnsMissingStats) {
analyzer.addWarning("Column " + exampleColumnMissingStats +
" does not have statistics, recomputing stats for the whole table");
}
// Get incremental statistics from all relevant partitions.
Collection<? extends FeFsPartition> allPartitions = hdfsTable.loadAllPartitions();
Map<Long, TPartitionStats> partitionStats =
getOrFetchPartitionStats(analyzer, hdfsTable, allPartitions,
/* excludedPartitions= */ Collections.<Long>emptySet());
for (FeFsPartition p: allPartitions) {
TPartitionStats partStats = partitionStats.get(p.getId());
if (partStats == null || tableIsMissingColStats) {
if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql());
// TODO(vercegovac): check what happens when "NULL" is used as a partitioning
// value.
List<String> partValues = PartitionKeyValue.getPartitionKeyValueStringList(
p.getPartitionValues(), "NULL");
expectedPartitions_.add(partValues);
} else {
validPartStats_.add(partStats);
}
}
if (expectedPartitions_.size() == hdfsTable.getPartitions().size()) {
expectedPartitions_.clear();
expectAllPartitions_ = true;
}
} else {
// Always compute stats on a set of partitions when told to.
for (FeFsPartition targetPartition: partitionSet_.getPartitions()) {
filterPreds.add(targetPartition.getConjunctSql());
List<String> partValues = PartitionKeyValue.getPartitionKeyValueStringList(
targetPartition.getPartitionValues(), "NULL");
expectedPartitions_.add(partValues);
}
// Create a hash set out of partitionSet_ for O(1) lookups.
// TODO(todd) avoid loading all partitions.
Set<Long> targetPartitions =
Sets.newHashSetWithExpectedSize(partitionSet_.getPartitions().size());
for (FeFsPartition p: partitionSet_.getPartitions()) {
targetPartitions.add(p.getId());
}
// Get incremental statistics for partitions that are not recomputed.
Collection<? extends FeFsPartition> allPartitions = hdfsTable.loadAllPartitions();
Map<Long, TPartitionStats> partitionStats = getOrFetchPartitionStats(
analyzer, hdfsTable, allPartitions, targetPartitions);
validPartStats_.addAll(partitionStats.values());
}
if (filterPreds.size() == 0 && validPartStats_.size() != 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("No partitions selected for incremental stats update");
}
analyzer.addWarning("No partitions selected for incremental stats update");
return;
}
} else {
// Not computing incremental stats.
expectAllPartitions_ = true;
if (table_ instanceof FeFsTable) {
expectAllPartitions_ = !FeFsTable.Utils.isStatsExtrapolationEnabled(
(FeFsTable) table_);
}
}
if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
// TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then
// advising the user to iterate.
analyzer.addWarning(
"Too many partitions selected, doing full recomputation of incremental stats");
filterPreds.clear();
validPartStats_.clear();
}
// Tablesample clause to be used for all child queries.
String tableSampleSql = analyzeTableSampleClause(analyzer);
// Query for getting the per-partition row count and the total row count.
StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
String countSql = "COUNT(*)";
if (isSampling()) {
// Extrapolate the count based on the effective sampling rate. Add an explicit CAST
// to BIGINT, which is the expected data type for row count.
countSql = String.format("CAST(ROUND(COUNT(*) / %.10f) AS BIGINT)",
effectiveSamplePerc_);
}
List<String> tableStatsSelectList = Lists.newArrayList(countSql);
// Add group by columns for incremental stats or with extrapolation disabled.
List<String> groupByCols = new ArrayList<>();
if (!updateTableStatsOnly()) {
for (Column partCol: hdfsTable.getClusteringColumns()) {
groupByCols.add(ToSqlUtils.getIdentSql(partCol.getName()));
}
tableStatsSelectList.addAll(groupByCols);
}
tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
tableStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql);
// Query for getting the per-column NDVs and number of NULLs.
List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer);
if (isIncremental_) columnStatsSelectList.addAll(groupByCols);
StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT ");
columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList));
columnStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql);
// Add the WHERE clause to filter out partitions that we don't want to compute
// incremental stats for. While this is a win in most situations, we would like to
// avoid this where it does no useful work (i.e. it selects all rows). This happens
// when there are no existing valid partitions (so all partitions will have been
// selected in) and there is no partition spec (so no single partition was explicitly
// selected in).
if (filterPreds.size() > 0 &&
(validPartStats_.size() > 0 || partitionSet_ != null)) {
String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds);
columnStatsQueryBuilder.append(filterClause);
tableStatsQueryBuilder.append(filterClause);
}
if (groupByCols.size() > 0) {
String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols);
if (isIncremental_) columnStatsQueryBuilder.append(groupBy);
tableStatsQueryBuilder.append(groupBy);
}
tableStatsQueryStr_ = tableStatsQueryBuilder.toString();
if (LOG.isTraceEnabled()) LOG.trace("Table stats query: " + tableStatsQueryStr_);
if (columnStatsSelectList.isEmpty()) {
// Table doesn't have any columns that we can compute stats for.
if (LOG.isTraceEnabled()) {
LOG.trace("No supported column types in table " + table_.getTableName() +
", no column statistics will be gathered.");
}
columnStatsQueryStr_ = null;
return;
}
columnStatsQueryStr_ = columnStatsQueryBuilder.toString();
if (LOG.isTraceEnabled()) LOG.trace("Column stats query: " + columnStatsQueryStr_);
}