in pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java [187:422]
public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
_serverMetrics = serverMetrics;
_realtimeTableName = config.getTableNameWithType();
_segmentName = config.getSegmentName();
_schema = config.getSchema();
_timeColumnName = config.getTimeColumnName();
_capacity = config.getCapacity();
SegmentZKMetadata segmentZKMetadata = config.getSegmentZKMetadata();
_segmentMetadata = new SegmentMetadataImpl(TableNameBuilder.extractRawTableName(_realtimeTableName),
segmentZKMetadata.getSegmentName(), _schema, segmentZKMetadata.getCreationTime()) {
@Override
public int getTotalDocs() {
return _numDocsIndexed;
}
@Override
public long getLastIndexedTimestamp() {
return _lastIndexedTimeMs;
}
@Override
public long getLatestIngestionTimestamp() {
return _latestIngestionTimeMs;
}
@Override
public boolean isMutableSegment() {
return true;
}
};
_offHeap = config.isOffHeap();
_memoryManager = config.getMemoryManager();
_statsHistory = config.getStatsHistory();
_partitionColumn = config.getPartitionColumn();
_partitionFunction = config.getPartitionFunction();
_mainPartitionId = config.getPartitionId();
_defaultNullHandlingEnabled = config.isNullHandlingEnabled();
_consumerDir = new File(config.getConsumerDir());
Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
List<DimensionFieldSpec> physicalDimensionFieldSpecs = new ArrayList<>(_schema.getDimensionNames().size());
List<MetricFieldSpec> physicalMetricFieldSpecs = new ArrayList<>(_schema.getMetricNames().size());
List<String> physicalTimeColumnNames = new ArrayList<>();
List<ComplexFieldSpec> physicalComplexFieldSpecs = new ArrayList<>();
for (FieldSpec fieldSpec : allFieldSpecs) {
if (!fieldSpec.isVirtualColumn()) {
physicalFieldSpecs.add(fieldSpec);
FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
if (fieldType == FieldSpec.FieldType.DIMENSION) {
physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.METRIC) {
physicalMetricFieldSpecs.add((MetricFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.DATE_TIME || fieldType == FieldSpec.FieldType.TIME) {
physicalTimeColumnNames.add(fieldSpec.getName());
} else if (fieldType == FieldSpec.FieldType.COMPLEX) {
physicalComplexFieldSpecs.add((ComplexFieldSpec) fieldSpec);
}
}
}
_physicalFieldSpecs = Collections.unmodifiableCollection(physicalFieldSpecs);
_physicalDimensionFieldSpecs = Collections.unmodifiableCollection(physicalDimensionFieldSpecs);
_physicalMetricFieldSpecs = Collections.unmodifiableCollection(physicalMetricFieldSpecs);
_physicalTimeColumnNames = Collections.unmodifiableCollection(physicalTimeColumnNames);
_physicalComplexFieldSpecs = Collections.unmodifiableCollection(physicalComplexFieldSpecs);
_numKeyColumns = _physicalDimensionFieldSpecs.size() + _physicalTimeColumnNames.size();
_logger =
LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + _segmentName + "_" + config.getStreamName());
// Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary,
// and no metrics have dictionary. If not enabled, the map returned is null.
_recordIdMap = enableMetricsAggregationIfPossible(config);
Map<String, Pair<String, ValueAggregator>> metricsAggregators = Collections.emptyMap();
if (_recordIdMap != null) {
metricsAggregators = getMetricsAggregators(config);
}
Set<IndexType> specialIndexes =
Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract
StandardIndexes.nullValueVector()); // null value vector implements other contract
// Initialize for each column
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
int fixedByteSize = -1;
DataType dataType = fieldSpec.getDataType();
DataType storedType = dataType.getStoredType();
if (!storedType.isFixedWidth()) {
// For aggregated metrics, we need to store values with fixed byte size so that in-place replacement is possible
Pair<String, ValueAggregator> aggregatorPair = metricsAggregators.get(column);
if (aggregatorPair != null) {
fixedByteSize = aggregatorPair.getRight().getMaxAggregatedValueByteSize();
}
}
FieldIndexConfigs indexConfigs =
Optional.ofNullable(config.getIndexConfigByCol().get(column)).orElse(FieldIndexConfigs.EMPTY);
boolean isDictionary = !isNoDictionaryColumn(indexConfigs, fieldSpec, column);
MutableIndexContext context =
MutableIndexContext.builder().withFieldSpec(fieldSpec).withMemoryManager(_memoryManager)
.withDictionary(isDictionary).withCapacity(_capacity).offHeap(_offHeap).withSegmentName(_segmentName)
.withEstimatedCardinality(_statsHistory.getEstimatedCardinality(column))
.withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column))
.withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column))
.withConsumerDir(_consumerDir)
.withFixedLengthBytes(fixedByteSize).build();
// Partition info
PartitionFunction partitionFunction = null;
Set<Integer> partitions = null;
if (column.equals(_partitionColumn)) {
partitionFunction = _partitionFunction;
// NOTE: Use a concurrent set because the partitions can be updated when the partition of the ingested record
// does not match the stream partition. This could happen when stream partition changes, or the records
// are not properly partitioned from the stream. Log a warning and emit a metric if it happens, then add
// the new partition into this set.
partitions = ConcurrentHashMap.newKeySet();
partitions.add(_mainPartitionId);
}
// TODO (mutable-index-spi): The comment above was here, but no check was done.
// It seems the code that apply that check was removed around 2020. Should we remove the comment?
// Check whether to generate raw index for the column while consuming
// Only support generating raw index on single-value columns that do not have inverted index while
// consuming. After consumption completes and the segment is built, all single-value columns can have raw index
// Dictionary-encoded column
MutableDictionary dictionary;
if (isDictionary) {
DictionaryIndexConfig dictionaryIndexConfig = indexConfigs.getConfig(StandardIndexes.dictionary());
if (dictionaryIndexConfig.isDisabled()) {
// Even if dictionary is disabled in the config, isNoDictionaryColumn(...) returned false, so
// we are going to create a dictionary.
// This may happen for several reasons. For example, when there is a inverted index on the column.
// See isNoDictionaryColumn to have more context.
dictionaryIndexConfig = DictionaryIndexConfig.DEFAULT;
}
dictionary = DictionaryIndexType.createMutableDictionary(context, dictionaryIndexConfig);
} else {
dictionary = null;
if (!fieldSpec.isSingleValueField()) {
// Raw MV columns
switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
break;
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for MV no-dictionary column: " + column);
}
}
}
// Null value vector
MutableNullValueVector nullValueVector;
if (isNullable(fieldSpec)) {
_logger.info("Column: {} is nullable", column);
nullValueVector = new MutableNullValueVector();
} else {
_logger.info("Column: {} is not nullable", column);
nullValueVector = null;
}
Map<IndexType, MutableIndex> mutableIndexes = new HashMap<>();
for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) {
if (!specialIndexes.contains(indexType)) {
addMutableIndex(mutableIndexes, indexType, context, indexConfigs);
}
}
Pair<String, ValueAggregator> columnAggregatorPair =
metricsAggregators.getOrDefault(column, Pair.of(column, null));
String sourceColumn = columnAggregatorPair.getLeft();
ValueAggregator valueAggregator = columnAggregatorPair.getRight();
// TODO this can be removed after forward index contents no longer depends on text index configs
// If the raw value is provided, use it for the forward/dictionary index of this column by wrapping the
// already created MutableIndex with a SameValue implementation. This optimization can only be done when
// the mutable index is being reused
Object rawValueForTextIndex = indexConfigs.getConfig(StandardIndexes.text()).getRawValueForTextIndex();
boolean reuseMutableIndex = indexConfigs.getConfig(StandardIndexes.text()).isReuseMutableIndex();
if (rawValueForTextIndex != null && reuseMutableIndex) {
if (dictionary == null) {
MutableIndex forwardIndex = mutableIndexes.get(StandardIndexes.forward());
mutableIndexes.put(StandardIndexes.forward(),
new SameValueMutableForwardIndex(rawValueForTextIndex, (MutableForwardIndex) forwardIndex));
} else {
dictionary = new SameValueMutableDictionary(rawValueForTextIndex, dictionary);
}
}
_indexContainerMap.put(column,
new IndexContainer(fieldSpec, partitionFunction, partitions, new ValuesInfo(), mutableIndexes, dictionary,
nullValueVector, sourceColumn, valueAggregator));
}
_partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
_dedupTimeColumn =
_partitionDedupMetadataManager != null ? _partitionDedupMetadataManager.getContext().getDedupTimeColumn()
: null;
_partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
if (_partitionUpsertMetadataManager != null) {
Preconditions.checkState(!isAggregateMetricsEnabled(),
"Metrics aggregation and upsert cannot be enabled together");
UpsertContext upsertContext = _partitionUpsertMetadataManager.getContext();
_upsertComparisonColumns = upsertContext.getComparisonColumns();
_deleteRecordColumn = upsertContext.getDeleteRecordColumn();
_upsertDropOutOfOrderRecord = upsertContext.isDropOutOfOrderRecord();
_upsertOutOfOrderRecordColumn = upsertContext.getOutOfOrderRecordColumn();
_upsertConsistencyMode = upsertContext.getConsistencyMode();
_validDocIds = new ThreadSafeMutableRoaringBitmap();
if (_deleteRecordColumn != null) {
_queryableDocIds = new ThreadSafeMutableRoaringBitmap();
} else {
_queryableDocIds = null;
}
} else {
_upsertComparisonColumns = null;
_deleteRecordColumn = null;
_upsertDropOutOfOrderRecord = false;
_upsertOutOfOrderRecordColumn = null;
_upsertConsistencyMode = null;
_validDocIds = null;
_queryableDocIds = null;
}
}