in pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java [1288:1536]
StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
String tableNameWithType = tableConfig.getTableName();
Map<String, FieldIndexConfigs> indexConfigsMap =
FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
String segmentName = segmentDataManager.getSegmentName();
IndexSegment segment = segmentDataManager.getSegment();
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames();
// Time column changed
String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumn != null) {
if (segmentMetadata.getTimeColumn() == null || !segmentMetadata.getTimeColumn().equals(timeColumn)) {
LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time column", tableNameWithType, segmentName);
return new StaleSegment(segmentName, true, "time column");
}
}
List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? sortedColumns.get(0) : null;
String partitionColumn = null;
ColumnPartitionConfig partitionConfig = null;
SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
// NOTE: Partition can only be enabled on a single column
if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap().size() == 1) {
Map.Entry<String, ColumnPartitionConfig> entry =
segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
partitionColumn = entry.getKey();
partitionConfig = entry.getValue();
}
Set<String> columnsInSegment = segmentMetadata.getAllColumns();
// Column is added
if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) {
LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column added", tableNameWithType, segmentName);
return new StaleSegment(segmentName, true, "column added");
}
// Get Index configuration for the Table Config
Set<String> noDictionaryColumns =
FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), indexConfigsMap);
Set<String> bloomFilters =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), indexConfigsMap);
Set<String> jsonIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), indexConfigsMap);
Set<String> invertedIndex =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), indexConfigsMap);
Set<String> nullValueVectorIndex =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(), indexConfigsMap);
Set<String> rangeIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), indexConfigsMap);
Set<String> h3Indexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), indexConfigsMap);
Set<String> fstIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), indexConfigsMap);
Set<String> textIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), indexConfigsMap);
List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig =
tableConfig.getIndexingConfig().getStarTreeIndexConfigs();
// Get the index configuration for StarTree index from segment metadata as JsonNode.
List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees();
// Generate StarTree index builder config from the segment metadata.
List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new ArrayList<>();
if (starTreeIndexMetadata != null) {
for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) {
builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata()));
}
}
// Generate StarTree index builder configs from the table config.
List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs =
StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig,
tableConfig.getIndexingConfig().isEnableDefaultStarTree(), segmentMetadata);
// Check if there is a mismatch between the StarTree index builder configs from the table config and the segment
// metadata.
if (!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs,
builderConfigFromSegmentMetadata)) {
return new StaleSegment(segmentName, true, "startree index");
}
for (String columnName : segmentPhysicalColumns) {
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
DataSource source = segment.getDataSource(columnName);
Preconditions.checkNotNull(columnMetadata);
Preconditions.checkNotNull(source);
// Column is deleted
if (fieldSpecInSchema == null) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: column deleted",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "column deleted: " + columnName);
}
// Field type changed
if (columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 0) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: field type", tableNameWithType,
columnName, segmentName);
return new StaleSegment(segmentName, true, "field type changed: " + columnName);
}
// Data type changed
if (!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: data type", tableNameWithType,
columnName, segmentName);
return new StaleSegment(segmentName, true, "data type changed: " + columnName);
}
// SV/MV changed
if (columnMetadata.isSingleValue() != fieldSpecInSchema.isSingleValueField()) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: single / multi value",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "single / multi value changed: " + columnName);
}
// TODO: detect if an index changes from Dictionary to Variable Length Dictionary or vice versa.
// TODO: RV TEST
boolean colHasDictionary = columnMetadata.hasDictionary();
// Encoding changed
if (colHasDictionary == noDictionaryColumns.contains(columnName)) {
// Check if dictionary update is needed
// 1. If the segment metadata has dictionary enabled and table has it disabled, its incompatible and refresh is
// needed.
// 2. If segment metadata has dictionary disabled, check if it has to be overridden. If not overridden,
// refresh is needed, since table has it enabled.
boolean incompatible = colHasDictionary || DictionaryIndexType.ignoreDictionaryOverride(
tableConfig.getIndexingConfig().isOptimizeDictionary(),
tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), fieldSpecInSchema,
indexConfigsMap.get(columnName), columnMetadata.getCardinality(), columnMetadata.getTotalNumberOfEntries());
if (incompatible) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: dictionary encoding,",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "dictionary encoding changed: " + columnName);
} else {
LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as dictionary overrides applied to col: {}",
tableNameWithType, segmentName, columnName);
}
}
// Sorted column not sorted
if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: sort column", tableNameWithType,
columnName, segmentName);
return new StaleSegment(segmentName, true, "sort column changed: " + columnName);
}
if (Objects.isNull(source.getBloomFilter()) == bloomFilters.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: bloom filter changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "bloom filter changed: " + columnName);
}
if (Objects.isNull(source.getJsonIndex()) == jsonIndex.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: json index changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "json index changed: " + columnName);
}
if (Objects.isNull(source.getTextIndex()) == textIndexes.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: text index changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "text index changed: " + columnName);
}
if (Objects.isNull(source.getFSTIndex()) == fstIndexes.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: fst index changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "fst index changed: " + columnName);
}
if (Objects.isNull(source.getH3Index()) == h3Indexes.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: h3 index changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "hst index changed: " + columnName);
}
// If a segment is sorted then it will automatically be given an inverted index and that overrides the
// TableConfig setting
if (columnMetadata.isSorted()) {
// If a column is sorted and does not have an inverted index but the table config does have an inverted index.
// But do not remove the inverted index from a sorted column even if the table config has no inverted index.
if (Objects.isNull(source.getInvertedIndex()) && invertedIndex.contains(columnName)) {
LOGGER.debug(
"tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index added to sorted column",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "invert index added to sort column: " + columnName);
}
} else {
if ((Objects.isNull(source.getInvertedIndex())) == invertedIndex.contains(columnName)) {
LOGGER.debug(
"tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index changed on unsorted "
+ "column", tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "inverted index changed on unsorted column: " + columnName);
}
}
// If a column has a NVV Reader and the Table Config says that it should not, then the NVV Reader can be removed.
// BUT if a column does NOT have a NVV Reader it cannot be added after the segment is created. So, for this check
// only check to see if an existing NVV Reader should be removed, but do not check if an NVV Reader needs to be
// added.
if (!Objects.isNull(source.getNullValueVector()) && !nullValueVectorIndex.contains(columnName)) {
LOGGER.debug(
"tableNameWithType: {}, columnName: {}, segmentName: {}, change: null value vector index removed from "
+ "column and cannot be added back to this segment.", tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "null value vector index removed from column: " + columnName);
}
if (Objects.isNull(source.getRangeIndex()) == rangeIndex.contains(columnName)) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: range index changed",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "range index changed: " + columnName);
}
// Partition changed or segment not properly partitioned
if (columnName.equals(partitionColumn)) {
PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
if (partitionFunction == null) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "partition function added: " + columnName);
}
if (!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName())) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function name",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "partition function name changed: " + columnName);
}
if (partitionFunction.getNumPartitions() != partitionConfig.getNumPartitions()) {
LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName: {}, change: num partitions",
tableNameWithType, columnName, segmentName);
return new StaleSegment(segmentName, true, "num partitions changed: " + columnName);
}
Set<Integer> partitions = columnMetadata.getPartitions();
if (partitions == null || partitions.size() != 1) {
LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partitions", tableNameWithType,
columnName, segmentName);
return new StaleSegment(segmentName, true, "partitions changed: " + columnName);
}
}
}
return new StaleSegment(segmentName, false, null);
}