in pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java [732:972]
public void run() {
long initialConsumptionEnd = 0L;
long lastCatchUpStart = 0L;
long catchUpTimeMillis = 0L;
_startTimeMs = now();
try {
if (!_isReadyToConsumeData.getAsBoolean()) {
do {
//noinspection BusyWait
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
} while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
}
// Acquire semaphore before consuming data
try {
_consumerCoordinator.acquire(_llcSegmentName);
} catch (ConsumerCoordinator.ShouldNotConsumeException e) {
_segmentLogger.info("Skipping consumption because: {}", e.getMessage());
return;
}
_consumerSemaphoreAcquired.set(true);
_consumerCoordinator.register(_llcSegmentName);
_segmentLogger.info("Acquired consumer semaphore.");
_consumeStartTime = now();
_segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}.", _llcSegmentName,
_segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC));
// TODO:
// When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is
// no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It
// can potentially cause the following problems:
// 1. The snapshot for the previous consuming segment might not be taken since it is not persisted yet
// 2. If the previous consuming segment is dropped but immutable segment is not downloaded and replaced yet,
// it might cause inconsistency (especially for partial upsert because events are not consumed in sequence)
// To address this problem, we should consider releasing the consumer semaphore after the consuming segment is
// persisted.
// Take upsert snapshot before starting consuming events
if (_partitionUpsertMetadataManager != null) {
if (_partitionUpsertMetadataManager.getContext().getMetadataTTL() > 0) {
// If upsertMetadataTTL is enabled, we will remove expired primary keys from upsertMetadata
// AFTER taking a snapshot. Taking the snapshot first is crucial to capture the final
// state of each key before it exits the TTL window. Out-of-TTL segments are skipped in
// the doAddSegment flow, and the snapshot is used to enableUpsert on the immutable out-of-TTL segment.
// If no snapshot is found, the entire segment is marked as valid and queryable.
_partitionUpsertMetadataManager.takeSnapshot();
_partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
} else {
// We should remove deleted-keys first and then take a snapshot. This is because the deletedKeysTTL
// flow removes keys from the map and updates to remove valid doc IDs. By taking the snapshot immediately
// after this process, we save one commit cycle, ensuring that the deletion of valid doc IDs is reflected
// immediately
_partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
_partitionUpsertMetadataManager.takeSnapshot();
}
}
if (_partitionDedupMetadataManager != null
&& _partitionDedupMetadataManager.getContext().getMetadataTTL() > 0) {
_partitionDedupMetadataManager.removeExpiredPrimaryKeys();
}
while (!_state.isFinal()) {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or we are stopped.
}
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
if (_shouldStop) {
break;
}
if (_state == State.INITIAL_CONSUMING) {
initialConsumptionEnd = now();
_serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(initialConsumptionEnd - _startTimeMs));
} else if (_state == State.CATCHING_UP) {
catchUpTimeMillis += now() - lastCatchUpStart;
_serverMetrics
.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
}
// If we are sending segmentConsumed() to the controller, we are in HOLDING state.
_state = State.HOLDING;
SegmentCompletionProtocol.Response response = postSegmentConsumedMsg();
SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus();
switch (status) {
case NOT_LEADER:
// Retain the same state
_segmentLogger.warn("Got not leader response");
hold();
break;
case CATCH_UP:
StreamPartitionMsgOffset rspOffset = extractOffset(response);
if (rspOffset.compareTo(_currentOffset) <= 0) {
// Something wrong with the controller. Back off and try again.
_segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
_currentOffset);
hold();
} else {
_state = State.CATCHING_UP;
_finalOffset = rspOffset;
lastCatchUpStart = now();
// We will restart consumption when we loop back above.
}
break;
case HOLD:
hold();
break;
case DISCARD:
// Keep this in memory, but wait for the online transition, and download when it comes in.
_state = State.DISCARDED;
break;
case KEEP: {
if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
_state = State.DISCARDED;
break;
}
_state = State.RETAINING;
// Lock the segment to avoid multiple threads touching the same segment.
Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
// NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
// CONSUMING -> ONLINE state transition.
segmentLock.lockInterruptibly();
try {
if (buildSegmentAndReplace()) {
_state = State.RETAINED;
} else {
// Could not build segment for some reason. We can only download it.
_state = State.ERROR;
_segmentLogger.error("Could not build segment for {}", _segmentNameStr);
}
} finally {
segmentLock.unlock();
}
break;
}
case COMMIT: {
_state = State.COMMITTING;
_currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
// Lock the segment to avoid multiple threads touching the same segment.
Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
// NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
// CONSUMING -> ONLINE state transition.
segmentLock.lockInterruptibly();
try {
// For tables with pauseless consumption enabled we want to start the commit protocol that
// 1. Updates the endOffset in the ZK metadata for the committing segment
// 2. Creates ZK metadata for the new consuming segment
// 3. Updates the IdealState for committing and new consuming segment to ONLINE and CONSUMING
// respectively.
// Refer to the PR for the new commit protocol: https://github.com/apache/pinot/pull/14741
if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1);
if (!startSegmentCommit()) {
// If for any reason commit failed, we don't want to be in COMMITTING state when we hold.
// Change the state to HOLDING before looping around.
_state = State.HOLDING;
_segmentLogger.info("Could not commit segment: {}. Retrying after hold", _segmentNameStr);
hold();
break;
}
} else {
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0);
}
long buildTimeSeconds = response.getBuildTimeSeconds();
buildSegmentForCommit(buildTimeSeconds * 1000L);
if (_segmentBuildDescriptor == null) {
// We could not build the segment. Go into error state.
_state = State.ERROR;
_segmentLogger.error("Could not build segment for {}", _segmentNameStr);
if (_segmentBuildFailedWithDeterministicError
&& _tableConfig.getIngestionConfig().isRetryOnSegmentBuildPrecheckFailure()) {
_segmentLogger.error(
"Found non-recoverable segment build error for {}, from offset {} to {},"
+ "sending notifyCannotBuild event.",
_segmentNameStr, _startOffset, _currentOffset);
notifySegmentBuildFailedWithDeterministicError();
}
break;
}
if (commitSegment(response.getControllerVipUrl())) {
_state = State.COMMITTED;
break;
}
} finally {
segmentLock.unlock();
}
// If for any reason commit failed, we don't want to be in COMMITTING state when we hold.
// Change the state to HOLDING before looping around.
_state = State.HOLDING;
_segmentLogger.info("Could not commit segment. Retrying after hold");
hold();
break;
}
default:
_segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
hold();
break;
}
}
} catch (Throwable e) {
if (_shouldStop) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Caught exception in consumer thread after stop() is invoked, ignoring the exception",
e);
} else {
_segmentLogger.info(
"Caught exception in consumer thread after stop() is invoked: {}, ignoring the exception",
e.getMessage());
}
} else {
String errorMessage = "Exception while in work";
_segmentLogger.error(errorMessage, e);
_state = State.ERROR;
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
postStopConsumedMsg(e.getClass().getName());
return;
}
}
removeSegmentFile();
if (initialConsumptionEnd != 0L) {
_serverMetrics
.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
}
// There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
// The destroy() method does not wait for the thread to terminate (and reasonably so, we dont want to wait
// forever).
// Since the _shouldStop variable is set to true only in stop() method, we know that the metric will be destroyed,
// so it is ok not to mark it non-consuming, as the main thread will clean up this metric in destroy() method
// as the final step.
if (!_shouldStop) {
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
}