public void run()

in pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java [732:972]


    public void run() {
      long initialConsumptionEnd = 0L;
      long lastCatchUpStart = 0L;
      long catchUpTimeMillis = 0L;
      _startTimeMs = now();
      try {
        if (!_isReadyToConsumeData.getAsBoolean()) {
          do {
            //noinspection BusyWait
            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
          } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
        }

        // Acquire semaphore before consuming data
        try {
          _consumerCoordinator.acquire(_llcSegmentName);
        } catch (ConsumerCoordinator.ShouldNotConsumeException e) {
          _segmentLogger.info("Skipping consumption because: {}", e.getMessage());
          return;
        }
        _consumerSemaphoreAcquired.set(true);
        _consumerCoordinator.register(_llcSegmentName);

        _segmentLogger.info("Acquired consumer semaphore.");

        _consumeStartTime = now();
        _segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}.", _llcSegmentName,
            _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC));

        // TODO:
        //   When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is
        //   no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It
        //   can potentially cause the following problems:
        //   1. The snapshot for the previous consuming segment might not be taken since it is not persisted yet
        //   2. If the previous consuming segment is dropped but immutable segment is not downloaded and replaced yet,
        //      it might cause inconsistency (especially for partial upsert because events are not consumed in sequence)
        //   To address this problem, we should consider releasing the consumer semaphore after the consuming segment is
        //   persisted.
        // Take upsert snapshot before starting consuming events
        if (_partitionUpsertMetadataManager != null) {
          if (_partitionUpsertMetadataManager.getContext().getMetadataTTL() > 0) {
            // If upsertMetadataTTL is enabled, we will remove expired primary keys from upsertMetadata
            // AFTER taking a snapshot. Taking the snapshot first is crucial to capture the final
            // state of each key before it exits the TTL window. Out-of-TTL segments are skipped in
            // the doAddSegment flow, and the snapshot is used to enableUpsert on the immutable out-of-TTL segment.
            // If no snapshot is found, the entire segment is marked as valid and queryable.
            _partitionUpsertMetadataManager.takeSnapshot();
            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
          } else {
            // We should remove deleted-keys first and then take a snapshot. This is because the deletedKeysTTL
            // flow removes keys from the map and updates to remove valid doc IDs. By taking the snapshot immediately
            // after this process, we save one commit cycle, ensuring that the deletion of valid doc IDs is reflected
            // immediately
            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
            _partitionUpsertMetadataManager.takeSnapshot();
          }
        }

        if (_partitionDedupMetadataManager != null
            && _partitionDedupMetadataManager.getContext().getMetadataTTL() > 0) {
          _partitionDedupMetadataManager.removeExpiredPrimaryKeys();
        }

        while (!_state.isFinal()) {
          if (_state.shouldConsume()) {
            consumeLoop();  // Consume until we reached the end criteria, or we are stopped.
          }
          _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
          if (_shouldStop) {
            break;
          }

          if (_state == State.INITIAL_CONSUMING) {
            initialConsumptionEnd = now();
            _serverMetrics.setValueOfTableGauge(_clientId,
                ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
                TimeUnit.MILLISECONDS.toSeconds(initialConsumptionEnd - _startTimeMs));
          } else if (_state == State.CATCHING_UP) {
            catchUpTimeMillis += now() - lastCatchUpStart;
            _serverMetrics
                .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
                    TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
          }

          // If we are sending segmentConsumed() to the controller, we are in HOLDING state.
          _state = State.HOLDING;
          SegmentCompletionProtocol.Response response = postSegmentConsumedMsg();
          SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus();
          switch (status) {
            case NOT_LEADER:
              // Retain the same state
              _segmentLogger.warn("Got not leader response");
              hold();
              break;
            case CATCH_UP:
              StreamPartitionMsgOffset rspOffset = extractOffset(response);
              if (rspOffset.compareTo(_currentOffset) <= 0) {
                // Something wrong with the controller. Back off and try again.
                _segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
                    _currentOffset);
                hold();
              } else {
                _state = State.CATCHING_UP;
                _finalOffset = rspOffset;
                lastCatchUpStart = now();
                // We will restart consumption when we loop back above.
              }
              break;
            case HOLD:
              hold();
              break;
            case DISCARD:
              // Keep this in memory, but wait for the online transition, and download when it comes in.
              _state = State.DISCARDED;
              break;
            case KEEP: {
              if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
                _state = State.DISCARDED;
                break;
              }
              _state = State.RETAINING;
              // Lock the segment to avoid multiple threads touching the same segment.
              Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
              // NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
              //       CONSUMING -> ONLINE state transition.
              segmentLock.lockInterruptibly();
              try {
                if (buildSegmentAndReplace()) {
                  _state = State.RETAINED;
                } else {
                  // Could not build segment for some reason. We can only download it.
                  _state = State.ERROR;
                  _segmentLogger.error("Could not build segment for {}", _segmentNameStr);
                }
              } finally {
                segmentLock.unlock();
              }
              break;
            }
            case COMMIT: {
              _state = State.COMMITTING;
              _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
              // Lock the segment to avoid multiple threads touching the same segment.
              Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
              // NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
              //       CONSUMING -> ONLINE state transition.
              segmentLock.lockInterruptibly();
              try {
                // For tables with pauseless consumption enabled we want to start the commit protocol that
                // 1. Updates the endOffset in the ZK metadata for the committing segment
                // 2. Creates ZK metadata for the new consuming segment
                // 3. Updates the IdealState for committing and new consuming segment to ONLINE and CONSUMING
                // respectively.
                // Refer to the PR for the new commit protocol: https://github.com/apache/pinot/pull/14741
                if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
                  _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1);
                  if (!startSegmentCommit()) {
                    // If for any reason commit failed, we don't want to be in COMMITTING state when we hold.
                    // Change the state to HOLDING before looping around.
                    _state = State.HOLDING;
                    _segmentLogger.info("Could not commit segment: {}. Retrying after hold", _segmentNameStr);
                    hold();
                    break;
                  }
                } else {
                  _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0);
                }
                long buildTimeSeconds = response.getBuildTimeSeconds();
                buildSegmentForCommit(buildTimeSeconds * 1000L);
                if (_segmentBuildDescriptor == null) {
                  // We could not build the segment. Go into error state.
                  _state = State.ERROR;
                  _segmentLogger.error("Could not build segment for {}", _segmentNameStr);
                  if (_segmentBuildFailedWithDeterministicError
                      && _tableConfig.getIngestionConfig().isRetryOnSegmentBuildPrecheckFailure()) {
                    _segmentLogger.error(
                        "Found non-recoverable segment build error for {}, from offset {} to {},"
                            + "sending notifyCannotBuild event.",
                        _segmentNameStr, _startOffset, _currentOffset);
                    notifySegmentBuildFailedWithDeterministicError();
                  }
                  break;
                }
                if (commitSegment(response.getControllerVipUrl())) {
                  _state = State.COMMITTED;
                  break;
                }
              } finally {
                segmentLock.unlock();
              }
              // If for any reason commit failed, we don't want to be in COMMITTING state when we hold.
              // Change the state to HOLDING before looping around.
              _state = State.HOLDING;
              _segmentLogger.info("Could not commit segment. Retrying after hold");
              hold();
              break;
            }
            default:
              _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
              hold();
              break;
          }
        }
      } catch (Throwable e) {
        if (_shouldStop) {
          if (_segmentLogger.isDebugEnabled()) {
            _segmentLogger.debug("Caught exception in consumer thread after stop() is invoked, ignoring the exception",
                e);
          } else {
            _segmentLogger.info(
                "Caught exception in consumer thread after stop() is invoked: {}, ignoring the exception",
                e.getMessage());
          }
        } else {
          String errorMessage = "Exception while in work";
          _segmentLogger.error(errorMessage, e);
          _state = State.ERROR;
          _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
          _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
          postStopConsumedMsg(e.getClass().getName());
          return;
        }
      }

      removeSegmentFile();

      if (initialConsumptionEnd != 0L) {
        _serverMetrics
            .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
                TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
      }
      // There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
      // The destroy() method does not wait for the thread to terminate (and reasonably so, we dont want to wait
      // forever).
      // Since the _shouldStop variable is set to true only in stop() method, we know that the metric will be destroyed,
      // so it is ok not to mark it non-consuming, as the main thread will clean up this metric in destroy() method
      // as the final step.
      if (!_shouldStop) {
        _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
      }
    }