StaleSegment isSegmentStale()

in pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java [1288:1536]


  StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
    Schema schema = indexLoadingConfig.getSchema();
    assert tableConfig != null && schema != null;

    String tableNameWithType = tableConfig.getTableName();
    Map<String, FieldIndexConfigs> indexConfigsMap =
        FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);

    String segmentName = segmentDataManager.getSegmentName();
    IndexSegment segment = segmentDataManager.getSegment();
    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
    Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames();

    // Time column changed
    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
    if (timeColumn != null) {
      if (segmentMetadata.getTimeColumn() == null || !segmentMetadata.getTimeColumn().equals(timeColumn)) {
        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time column", tableNameWithType, segmentName);
        return new StaleSegment(segmentName, true, "time column");
      }
    }

    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
    String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? sortedColumns.get(0) : null;

    String partitionColumn = null;
    ColumnPartitionConfig partitionConfig = null;
    SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
    // NOTE: Partition can only be enabled on a single column
    if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap().size() == 1) {
      Map.Entry<String, ColumnPartitionConfig> entry =
          segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
      partitionColumn = entry.getKey();
      partitionConfig = entry.getValue();
    }

    Set<String> columnsInSegment = segmentMetadata.getAllColumns();

    // Column is added
    if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) {
      LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column added", tableNameWithType, segmentName);
      return new StaleSegment(segmentName, true, "column added");
    }

    // Get Index configuration for the Table Config
    Set<String> noDictionaryColumns =
        FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), indexConfigsMap);
    Set<String> bloomFilters =
        FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), indexConfigsMap);
    Set<String> jsonIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), indexConfigsMap);
    Set<String> invertedIndex =
        FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), indexConfigsMap);
    Set<String> nullValueVectorIndex =
        FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(), indexConfigsMap);
    Set<String> rangeIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), indexConfigsMap);
    Set<String> h3Indexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), indexConfigsMap);
    Set<String> fstIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), indexConfigsMap);
    Set<String> textIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), indexConfigsMap);
    List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig =
        tableConfig.getIndexingConfig().getStarTreeIndexConfigs();

    // Get the index configuration for StarTree index from segment metadata as JsonNode.
    List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees();

    // Generate StarTree index builder config from the segment metadata.
    List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new ArrayList<>();
    if (starTreeIndexMetadata != null) {
      for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) {
        builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata()));
      }
    }

    // Generate StarTree index builder configs from the table config.
    List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs =
        StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig,
            tableConfig.getIndexingConfig().isEnableDefaultStarTree(), segmentMetadata);

    // Check if there is a mismatch between the StarTree index builder configs from the table config and the segment
    // metadata.
    if (!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs,
        builderConfigFromSegmentMetadata)) {
      return new StaleSegment(segmentName, true, "startree index");
    }

    for (String columnName : segmentPhysicalColumns) {
      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
      DataSource source = segment.getDataSource(columnName);
      Preconditions.checkNotNull(columnMetadata);
      Preconditions.checkNotNull(source);

      // Column is deleted
      if (fieldSpecInSchema == null) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: column deleted",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "column deleted: " + columnName);
      }

      // Field type changed
      if (columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 0) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: field type", tableNameWithType,
            columnName, segmentName);
        return new StaleSegment(segmentName, true, "field type changed: " + columnName);
      }

      // Data type changed
      if (!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: data type", tableNameWithType,
            columnName, segmentName);
        return new StaleSegment(segmentName, true, "data type changed: " + columnName);
      }

      // SV/MV changed
      if (columnMetadata.isSingleValue() != fieldSpecInSchema.isSingleValueField()) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: single / multi value",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "single / multi value changed: " + columnName);
      }

      // TODO: detect if an index changes from Dictionary to Variable Length Dictionary or vice versa.
      // TODO: RV TEST
      boolean colHasDictionary = columnMetadata.hasDictionary();
      // Encoding changed
      if (colHasDictionary == noDictionaryColumns.contains(columnName)) {
        // Check if dictionary update is needed
        // 1. If the segment metadata has dictionary enabled and table has it disabled, its incompatible and refresh is
        // needed.
        // 2. If segment metadata has dictionary disabled, check if it has to be overridden. If not overridden,
        // refresh is needed, since table has it enabled.
        boolean incompatible = colHasDictionary || DictionaryIndexType.ignoreDictionaryOverride(
            tableConfig.getIndexingConfig().isOptimizeDictionary(),
            tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
            tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
            tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), fieldSpecInSchema,
            indexConfigsMap.get(columnName), columnMetadata.getCardinality(), columnMetadata.getTotalNumberOfEntries());
        if (incompatible) {
          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: dictionary encoding,",
              tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "dictionary encoding changed: " + columnName);
        } else {
          LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as dictionary overrides applied to col: {}",
              tableNameWithType, segmentName, columnName);
        }
      }

      // Sorted column not sorted
      if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: sort column", tableNameWithType,
            columnName, segmentName);
        return new StaleSegment(segmentName, true, "sort column changed: " + columnName);
      }

      if (Objects.isNull(source.getBloomFilter()) == bloomFilters.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: bloom filter changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "bloom filter changed: " + columnName);
      }

      if (Objects.isNull(source.getJsonIndex()) == jsonIndex.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: json index changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "json index changed: " + columnName);
      }

      if (Objects.isNull(source.getTextIndex()) == textIndexes.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: text index changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "text index changed: " + columnName);
      }

      if (Objects.isNull(source.getFSTIndex()) == fstIndexes.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: fst index changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "fst index changed: " + columnName);
      }

      if (Objects.isNull(source.getH3Index()) == h3Indexes.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: h3 index changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "hst index changed: " + columnName);
      }

      // If a segment is sorted then it will automatically be given an inverted index and that overrides the
      // TableConfig setting
      if (columnMetadata.isSorted()) {
        // If a column is sorted and does not have an inverted index but the table config does have an inverted index.
        // But do not remove the inverted index from a sorted column even if the table config has no inverted index.
        if (Objects.isNull(source.getInvertedIndex()) && invertedIndex.contains(columnName)) {
          LOGGER.debug(
              "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index added to sorted column",
              tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "invert index added to sort column: " + columnName);
        }
      } else {
        if ((Objects.isNull(source.getInvertedIndex())) == invertedIndex.contains(columnName)) {
          LOGGER.debug(
              "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index changed on unsorted "
                  + "column", tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "inverted index changed on unsorted column: " + columnName);
        }
      }

      // If a column has a NVV Reader and the Table Config says that it should not, then the NVV Reader can be removed.
      // BUT if a column does NOT have a NVV Reader it cannot be added after the segment is created. So, for this check
      // only check to see if an existing NVV Reader should be removed, but do not check if an NVV Reader needs to be
      // added.
      if (!Objects.isNull(source.getNullValueVector()) && !nullValueVectorIndex.contains(columnName)) {
        LOGGER.debug(
            "tableNameWithType: {}, columnName: {}, segmentName: {}, change: null value vector index removed from "
                + "column and cannot be added back to this segment.", tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "null value vector index removed from column: " + columnName);
      }

      if (Objects.isNull(source.getRangeIndex()) == rangeIndex.contains(columnName)) {
        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: range index changed",
            tableNameWithType, columnName, segmentName);
        return new StaleSegment(segmentName, true, "range index changed: " + columnName);
      }

      // Partition changed or segment not properly partitioned
      if (columnName.equals(partitionColumn)) {
        PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
        if (partitionFunction == null) {
          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function",
              tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "partition function added: " + columnName);
        }
        if (!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName())) {
          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function name",
              tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "partition function name changed: " + columnName);
        }
        if (partitionFunction.getNumPartitions() != partitionConfig.getNumPartitions()) {
          LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName: {}, change: num partitions",
              tableNameWithType, columnName, segmentName);
          return new StaleSegment(segmentName, true, "num partitions changed: " + columnName);
        }
        Set<Integer> partitions = columnMetadata.getPartitions();
        if (partitions == null || partitions.size() != 1) {
          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partitions", tableNameWithType,
              columnName, segmentName);
          return new StaleSegment(segmentName, true, "partitions changed: " + columnName);
        }
      }
    }

    return new StaleSegment(segmentName, false, null);
  }