in pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java [1587:1768]
public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator consumerCoordinator,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager,
@Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = segmentZKMetadata;
_tableConfig = tableConfig;
_tableNameWithType = _tableConfig.getTableName();
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
_schema = schema;
_llcSegmentName = llcSegmentName;
_consumerCoordinator = consumerCoordinator;
_serverMetrics = serverMetrics;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getInstanceId();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
_protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
_segmentCompletionMode = completionConfig != null
&& CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())
? CompletionMode.DOWNLOAD : CompletionMode.DEFAULT;
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
_trackFilteredMessageOffsets = ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null
&& ingestionConfig.getStreamIngestionConfig().isTrackFilteredMessageOffsets();
_parallelSegmentConsumptionPolicy = getParallelConsumptionPolicy();
String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
_partitionGroupId = llcSegmentName.getPartitionGroupId();
_streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
_streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMaps(_tableConfig)
.get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId)));
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
_streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory();
String streamTopic = _streamConfig.getTopicName();
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_partitionGroupConsumptionStatus =
new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(),
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()),
_segmentZKMetadata.getEndOffset() == null ? null
: _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
_segmentZKMetadata.getStatus().toString());
InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
if (StringUtils.isNotBlank(clientIdSuffix)) {
_clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix;
} else {
_clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId;
}
_segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
if (indexLoadingConfig.isRealtimeOffHeapAllocation() && !indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
_memoryManager =
new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics);
} else {
// For on-heap allocation, we still need a memory manager for forward index.
// Dictionary will be allocated on heap.
_memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics);
}
_partitionRateLimiter = RealtimeConsumptionRateManager.getInstance()
.createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId);
_serverRateLimiter = RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
// Read the max number of rows
int segmentMaxRowCount = segmentZKMetadata.getSizeThresholdToFlushSegment();
if (segmentMaxRowCount <= 0) {
segmentMaxRowCount = _streamConfig.getFlushThresholdRows();
}
if (segmentMaxRowCount <= 0) {
segmentMaxRowCount = StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS;
}
_segmentMaxRowCount = segmentMaxRowCount;
_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
// Start new realtime segment
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic)
.setSchema(_schema)
.setTimeColumnName(timeColumnName)
.setCapacity(_segmentMaxRowCount)
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap)
.setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
.setDefaultNullHandlingEnabled(_defaultNullHandlingEnabled)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setConsumerDir(consumerDir);
// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema);
RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f);
AtomicReference<StreamDataDecoder> localStreamDataDecoder = new AtomicReference<>();
try {
retryPolicy.attempt(() -> {
try {
StreamMessageDecoder streamMessageDecoder = createMessageDecoder(fieldsToRead);
localStreamDataDecoder.set(new StreamDataDecoderImpl(streamMessageDecoder));
return true;
} catch (Exception e) {
_segmentLogger.warn("Failed to initialize the StreamMessageDecoder: ", e);
return false;
}
});
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
throw e;
}
_streamDataDecoder = localStreamDataDecoder.get();
try {
_transformPipeline = new TransformPipeline(tableConfig, schema);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the TransformPipeline", e));
throw e;
}
try {
_startOffset = _partitionGroupConsumptionStatus.getStartOffset();
_currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
makeStreamConsumer("Starting");
createPartitionMetadataProvider("Starting");
setPartitionParameters(realtimeSegmentConfigBuilder, indexingConfig.getSegmentPartitionConfig());
_realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
_resourceTmpDir = new File(resourceDataDir, RESOURCE_TEMP_DIR_NAME);
if (!_resourceTmpDir.exists()) {
_resourceTmpDir.mkdirs();
}
_state = State.INITIAL_CONSUMING;
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
setConsumeEndTime(segmentZKMetadata, now());
_segmentCommitterFactory =
new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
} catch (Throwable t) {
// In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via Helix Admin.
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
"Failed to initialize segment data manager", t));
_segmentLogger.warn(
"Scheduling task to call controller to mark the segment as OFFLINE in Ideal State due"
+ " to initialization error: '{}'",
t.getMessage());
// Since we are going to throw exception from this thread (helix execution thread), the externalview
// entry for this segment will be ERROR. We allow time for Helix to make this transition, and then
// invoke controller API mark it OFFLINE in the idealstate.
new Thread(() -> {
ConsumptionStopIndicator indicator = new ConsumptionStopIndicator(_currentOffset, _segmentNameStr, _instanceId,
_protocolHandler, "Consuming segment initialization error", _segmentLogger);
try {
// Allow 30s for Helix to mark currentstate and externalview to ERROR, because
// we are about to receive an ERROR->OFFLINE state transition once we call
// postSegmentStoppedConsuming() method.
Thread.sleep(30_000);
indicator.postSegmentStoppedConsuming();
} catch (InterruptedException ie) {
// We got interrupted trying to post stop-consumed message. Give up at this point
return;
}
}).start();
throw t;
}
}