public MutableSegmentImpl()

in pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java [187:422]


  public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
    _serverMetrics = serverMetrics;
    _realtimeTableName = config.getTableNameWithType();
    _segmentName = config.getSegmentName();
    _schema = config.getSchema();
    _timeColumnName = config.getTimeColumnName();
    _capacity = config.getCapacity();
    SegmentZKMetadata segmentZKMetadata = config.getSegmentZKMetadata();
    _segmentMetadata = new SegmentMetadataImpl(TableNameBuilder.extractRawTableName(_realtimeTableName),
        segmentZKMetadata.getSegmentName(), _schema, segmentZKMetadata.getCreationTime()) {
      @Override
      public int getTotalDocs() {
        return _numDocsIndexed;
      }

      @Override
      public long getLastIndexedTimestamp() {
        return _lastIndexedTimeMs;
      }

      @Override
      public long getLatestIngestionTimestamp() {
        return _latestIngestionTimeMs;
      }

      @Override
      public boolean isMutableSegment() {
        return true;
      }
    };

    _offHeap = config.isOffHeap();
    _memoryManager = config.getMemoryManager();
    _statsHistory = config.getStatsHistory();
    _partitionColumn = config.getPartitionColumn();
    _partitionFunction = config.getPartitionFunction();
    _mainPartitionId = config.getPartitionId();
    _defaultNullHandlingEnabled = config.isNullHandlingEnabled();
    _consumerDir = new File(config.getConsumerDir());

    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
    List<DimensionFieldSpec> physicalDimensionFieldSpecs = new ArrayList<>(_schema.getDimensionNames().size());
    List<MetricFieldSpec> physicalMetricFieldSpecs = new ArrayList<>(_schema.getMetricNames().size());
    List<String> physicalTimeColumnNames = new ArrayList<>();
    List<ComplexFieldSpec> physicalComplexFieldSpecs = new ArrayList<>();

    for (FieldSpec fieldSpec : allFieldSpecs) {
      if (!fieldSpec.isVirtualColumn()) {
        physicalFieldSpecs.add(fieldSpec);
        FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
        if (fieldType == FieldSpec.FieldType.DIMENSION) {
          physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
        } else if (fieldType == FieldSpec.FieldType.METRIC) {
          physicalMetricFieldSpecs.add((MetricFieldSpec) fieldSpec);
        } else if (fieldType == FieldSpec.FieldType.DATE_TIME || fieldType == FieldSpec.FieldType.TIME) {
          physicalTimeColumnNames.add(fieldSpec.getName());
        } else if (fieldType == FieldSpec.FieldType.COMPLEX) {
          physicalComplexFieldSpecs.add((ComplexFieldSpec) fieldSpec);
        }
      }
    }
    _physicalFieldSpecs = Collections.unmodifiableCollection(physicalFieldSpecs);
    _physicalDimensionFieldSpecs = Collections.unmodifiableCollection(physicalDimensionFieldSpecs);
    _physicalMetricFieldSpecs = Collections.unmodifiableCollection(physicalMetricFieldSpecs);
    _physicalTimeColumnNames = Collections.unmodifiableCollection(physicalTimeColumnNames);
    _physicalComplexFieldSpecs = Collections.unmodifiableCollection(physicalComplexFieldSpecs);

    _numKeyColumns = _physicalDimensionFieldSpecs.size() + _physicalTimeColumnNames.size();

    _logger =
        LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + _segmentName + "_" + config.getStreamName());

    // Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary,
    // and no metrics have dictionary. If not enabled, the map returned is null.
    _recordIdMap = enableMetricsAggregationIfPossible(config);

    Map<String, Pair<String, ValueAggregator>> metricsAggregators = Collections.emptyMap();
    if (_recordIdMap != null) {
      metricsAggregators = getMetricsAggregators(config);
    }

    Set<IndexType> specialIndexes =
        Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract
            StandardIndexes.nullValueVector()); // null value vector implements other contract

    // Initialize for each column
    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
      String column = fieldSpec.getName();

      int fixedByteSize = -1;
      DataType dataType = fieldSpec.getDataType();
      DataType storedType = dataType.getStoredType();
      if (!storedType.isFixedWidth()) {
        // For aggregated metrics, we need to store values with fixed byte size so that in-place replacement is possible
        Pair<String, ValueAggregator> aggregatorPair = metricsAggregators.get(column);
        if (aggregatorPair != null) {
          fixedByteSize = aggregatorPair.getRight().getMaxAggregatedValueByteSize();
        }
      }

      FieldIndexConfigs indexConfigs =
          Optional.ofNullable(config.getIndexConfigByCol().get(column)).orElse(FieldIndexConfigs.EMPTY);
      boolean isDictionary = !isNoDictionaryColumn(indexConfigs, fieldSpec, column);
      MutableIndexContext context =
          MutableIndexContext.builder().withFieldSpec(fieldSpec).withMemoryManager(_memoryManager)
              .withDictionary(isDictionary).withCapacity(_capacity).offHeap(_offHeap).withSegmentName(_segmentName)
              .withEstimatedCardinality(_statsHistory.getEstimatedCardinality(column))
              .withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column))
              .withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column))
              .withConsumerDir(_consumerDir)
              .withFixedLengthBytes(fixedByteSize).build();

      // Partition info
      PartitionFunction partitionFunction = null;
      Set<Integer> partitions = null;
      if (column.equals(_partitionColumn)) {
        partitionFunction = _partitionFunction;

        // NOTE: Use a concurrent set because the partitions can be updated when the partition of the ingested record
        //       does not match the stream partition. This could happen when stream partition changes, or the records
        //       are not properly partitioned from the stream. Log a warning and emit a metric if it happens, then add
        //       the new partition into this set.
        partitions = ConcurrentHashMap.newKeySet();
        partitions.add(_mainPartitionId);
      }

      // TODO (mutable-index-spi): The comment above was here, but no check was done.
      //  It seems the code that apply that check was removed around 2020. Should we remove the comment?
      // Check whether to generate raw index for the column while consuming
      // Only support generating raw index on single-value columns that do not have inverted index while
      // consuming. After consumption completes and the segment is built, all single-value columns can have raw index

      // Dictionary-encoded column
      MutableDictionary dictionary;
      if (isDictionary) {
        DictionaryIndexConfig dictionaryIndexConfig = indexConfigs.getConfig(StandardIndexes.dictionary());
        if (dictionaryIndexConfig.isDisabled()) {
          // Even if dictionary is disabled in the config, isNoDictionaryColumn(...) returned false, so
          // we are going to create a dictionary.
          // This may happen for several reasons. For example, when there is a inverted index on the column.
          // See isNoDictionaryColumn to have more context.
          dictionaryIndexConfig = DictionaryIndexConfig.DEFAULT;
        }
        dictionary = DictionaryIndexType.createMutableDictionary(context, dictionaryIndexConfig);
      } else {
        dictionary = null;
        if (!fieldSpec.isSingleValueField()) {
          // Raw MV columns
          switch (storedType) {
            case INT:
            case LONG:
            case FLOAT:
            case DOUBLE:
              break;
            default:
              throw new UnsupportedOperationException(
                  "Unsupported data type: " + dataType + " for MV no-dictionary column: " + column);
          }
        }
      }

      // Null value vector
      MutableNullValueVector nullValueVector;
      if (isNullable(fieldSpec)) {
        _logger.info("Column: {} is nullable", column);
        nullValueVector = new MutableNullValueVector();
      } else {
        _logger.info("Column: {} is not nullable", column);
        nullValueVector = null;
      }

      Map<IndexType, MutableIndex> mutableIndexes = new HashMap<>();
      for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) {
        if (!specialIndexes.contains(indexType)) {
          addMutableIndex(mutableIndexes, indexType, context, indexConfigs);
        }
      }

      Pair<String, ValueAggregator> columnAggregatorPair =
          metricsAggregators.getOrDefault(column, Pair.of(column, null));
      String sourceColumn = columnAggregatorPair.getLeft();
      ValueAggregator valueAggregator = columnAggregatorPair.getRight();

      // TODO this can be removed after forward index contents no longer depends on text index configs
      // If the raw value is provided, use it for the forward/dictionary index of this column by wrapping the
      // already created MutableIndex with a SameValue implementation. This optimization can only be done when
      // the mutable index is being reused
      Object rawValueForTextIndex = indexConfigs.getConfig(StandardIndexes.text()).getRawValueForTextIndex();
      boolean reuseMutableIndex = indexConfigs.getConfig(StandardIndexes.text()).isReuseMutableIndex();
      if (rawValueForTextIndex != null && reuseMutableIndex) {
        if (dictionary == null) {
          MutableIndex forwardIndex = mutableIndexes.get(StandardIndexes.forward());
          mutableIndexes.put(StandardIndexes.forward(),
              new SameValueMutableForwardIndex(rawValueForTextIndex, (MutableForwardIndex) forwardIndex));
        } else {
          dictionary = new SameValueMutableDictionary(rawValueForTextIndex, dictionary);
        }
      }

      _indexContainerMap.put(column,
          new IndexContainer(fieldSpec, partitionFunction, partitions, new ValuesInfo(), mutableIndexes, dictionary,
              nullValueVector, sourceColumn, valueAggregator));
    }

    _partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
    _dedupTimeColumn =
        _partitionDedupMetadataManager != null ? _partitionDedupMetadataManager.getContext().getDedupTimeColumn()
            : null;

    _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
    if (_partitionUpsertMetadataManager != null) {
      Preconditions.checkState(!isAggregateMetricsEnabled(),
          "Metrics aggregation and upsert cannot be enabled together");
      UpsertContext upsertContext = _partitionUpsertMetadataManager.getContext();
      _upsertComparisonColumns = upsertContext.getComparisonColumns();
      _deleteRecordColumn = upsertContext.getDeleteRecordColumn();
      _upsertDropOutOfOrderRecord = upsertContext.isDropOutOfOrderRecord();
      _upsertOutOfOrderRecordColumn = upsertContext.getOutOfOrderRecordColumn();
      _upsertConsistencyMode = upsertContext.getConsistencyMode();
      _validDocIds = new ThreadSafeMutableRoaringBitmap();
      if (_deleteRecordColumn != null) {
        _queryableDocIds = new ThreadSafeMutableRoaringBitmap();
      } else {
        _queryableDocIds = null;
      }
    } else {
      _upsertComparisonColumns = null;
      _deleteRecordColumn = null;
      _upsertDropOutOfOrderRecord = false;
      _upsertOutOfOrderRecordColumn = null;
      _upsertConsistencyMode = null;
      _validDocIds = null;
      _queryableDocIds = null;
    }
  }