in pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java [142:322]
private void runQueryStrategy()
throws Exception {
// Get all resources in cluster
List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(_clusterName);
for (String tableNameWithType : resourcesInCluster) {
// Skip non-table resources
if (!TableNameBuilder.isTableResource(tableNameWithType)) {
continue;
}
// Skip tables that do not match the defined name pattern
if (_tableNamePattern != null && !tableNameWithType.matches(_tableNamePattern)) {
continue;
}
LOGGER.info("Table: {} matches the table name pattern: {}", tableNameWithType, _tableNamePattern);
// Get the inverted index config
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkNotNull(tableConfig);
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
List<String> invertedIndexColumns = indexingConfig.getInvertedIndexColumns();
boolean autoGeneratedInvertedIndex = indexingConfig.isAutoGeneratedInvertedIndex();
// Handle auto-generated inverted index
if (autoGeneratedInvertedIndex) {
Preconditions.checkState(!invertedIndexColumns.isEmpty(), "Auto-generated inverted index list is empty");
// NEW mode, skip
if (_mode == Mode.NEW) {
LOGGER.info(
"Table: {}, skip adding inverted index because it has auto-generated inverted index and under NEW mode",
tableNameWithType);
continue;
}
// REMOVE mode, remove the inverted index and update
if (_mode == Mode.REMOVE) {
invertedIndexColumns.clear();
indexingConfig.setAutoGeneratedInvertedIndex(false);
if (updateIndexConfig(tableNameWithType, tableConfig)) {
LOGGER.info("Table: {}, removed auto-generated inverted index", tableNameWithType);
} else {
LOGGER.error("Table: {}, failed to remove auto-generated inverted index", tableNameWithType);
}
continue;
}
// REFRESH mode, remove auto-generated inverted index
if (_mode == Mode.REFRESH) {
invertedIndexColumns.clear();
}
} else {
// Handle null inverted index columns
if (invertedIndexColumns == null) {
invertedIndexColumns = new ArrayList<>();
indexingConfig.setInvertedIndexColumns(invertedIndexColumns);
}
// Remove empty strings
int emptyStringIndex;
while ((emptyStringIndex = invertedIndexColumns.indexOf("")) != -1) {
invertedIndexColumns.remove(emptyStringIndex);
}
// Skip non-empty non-auto-generated inverted index
if (!invertedIndexColumns.isEmpty()) {
LOGGER.info("Table: {}, skip adding inverted index because it has non-auto-generated inverted index",
tableNameWithType);
continue;
}
}
// Skip tables without a schema
Schema tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
if (tableSchema == null) {
LOGGER.info("Table: {}, skip adding inverted index because it does not have a schema", tableNameWithType);
continue;
}
// Skip tables without dimensions
List<String> dimensionNames = tableSchema.getDimensionNames();
if (dimensionNames.isEmpty()) {
LOGGER.info("Table: {}, skip adding inverted index because it does not have any dimension column",
tableNameWithType);
continue;
}
// Skip tables without a proper time column
String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumnName == null) {
LOGGER.info(
"Table: {}, skip adding inverted index because it does not have a time column specified in the table "
+ "config", tableNameWithType);
continue;
}
DateTimeFieldSpec dateTimeSpec = tableSchema.getSpecForTimeColumn(timeColumnName);
if (dateTimeSpec == null || dateTimeSpec.getDataType() == FieldSpec.DataType.STRING) {
LOGGER.info("Table: {}, skip adding inverted index because it does not have a numeric time column",
tableNameWithType);
continue;
}
TimeUnit timeUnit = dateTimeSpec.getFormatSpec().getColumnUnit();
if (timeUnit != TimeUnit.DAYS) {
LOGGER.warn("Table: {}, time column {] has non-DAYS time unit: {}", timeColumnName, timeUnit);
}
// Only add inverted index to table larger than a threshold
JsonNode queryResponse = sendQuery("SELECT COUNT(*) FROM " + tableNameWithType);
long numTotalDocs = queryResponse.get("totalDocs").asLong();
LOGGER.info("Table: {}, number of total documents: {}", tableNameWithType, numTotalDocs);
if (numTotalDocs <= _tableSizeThreshold) {
LOGGER.info("Table: {}, skip adding inverted index because the table is too small", tableNameWithType);
continue;
}
// Get each dimension's cardinality on one timestamp's data
queryResponse = sendQuery("SELECT Max(" + timeColumnName + ") FROM " + tableNameWithType);
long maxTimeStamp = queryResponse.get("aggregationResults").get(0).get("value").asLong();
LOGGER.info("Table: {}, max time column {}: {}", tableNameWithType, timeColumnName, maxTimeStamp);
// Query DISTINCTCOUNT on all dimensions in one query might cause timeout, so query them separately
List<ResultPair> resultPairs = new ArrayList<>();
for (String dimensionName : dimensionNames) {
String query =
"SELECT DISTINCTCOUNT(" + dimensionName + ") FROM " + tableNameWithType + " WHERE " + timeColumnName + " = "
+ maxTimeStamp;
queryResponse = sendQuery(query);
JsonNode result = queryResponse.get("aggregationResults").get(0);
resultPairs.add(new ResultPair(result.get("function").asText().substring("distinctCount_".length()),
result.get("value").asLong()));
}
// Sort the dimensions based on their cardinalities
Collections.sort(resultPairs);
// Add the top dimensions into inverted index columns
int numInvertedIndex = Math.min(_maxNumInvertedIndexAdded, resultPairs.size());
for (int i = 0; i < numInvertedIndex; i++) {
ResultPair resultPair = resultPairs.get(i);
String columnName = resultPair._key;
long cardinality = resultPair._value;
if (cardinality > _cardinalityThreshold) {
// Do not append inverted index if already exists
if (!invertedIndexColumns.contains(columnName)) {
invertedIndexColumns.add(columnName);
}
LOGGER.info("Table: {}, add inverted index to column {} with cardinality: {}", tableNameWithType, columnName,
cardinality);
} else {
LOGGER.info("Table: {}, skip adding inverted index to column {} with cardinality: {}", tableNameWithType,
columnName, cardinality);
break;
}
}
// Update indexing config
if (!invertedIndexColumns.isEmpty()) {
indexingConfig.setAutoGeneratedInvertedIndex(true);
if (updateIndexConfig(tableNameWithType, tableConfig)) {
LOGGER.info("Table: {}, added inverted index to columns: {}", tableNameWithType, invertedIndexColumns);
} else {
LOGGER.error("Table: {}, failed to add inverted index to columns: {}", tableNameWithType,
invertedIndexColumns);
}
} else {
if (autoGeneratedInvertedIndex) {
Preconditions.checkState(_mode == Mode.REFRESH);
// Remove existing auto-generated inverted index because no column matches all the conditions
indexingConfig.setAutoGeneratedInvertedIndex(false);
if (updateIndexConfig(tableNameWithType, tableConfig)) {
LOGGER.info("Table: {}, removed auto-generated inverted index", tableNameWithType);
} else {
LOGGER.error("Table: {}, failed to remove auto-generated inverted index", tableNameWithType);
}
}
}
}
}