public RealtimeSegmentDataManager()

in pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java [1587:1768]


  public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
      Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator consumerCoordinator,
      ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager,
      @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isReadyToConsumeData)
      throws AttemptsExceededException, RetriableOperationException {
    _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
    _segmentZKMetadata = segmentZKMetadata;
    _tableConfig = tableConfig;
    _tableNameWithType = _tableConfig.getTableName();
    _realtimeTableDataManager = realtimeTableDataManager;
    _resourceDataDir = resourceDataDir;
    _schema = schema;
    _llcSegmentName = llcSegmentName;
    _consumerCoordinator = consumerCoordinator;
    _serverMetrics = serverMetrics;
    _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
    _partitionDedupMetadataManager = partitionDedupMetadataManager;
    _isReadyToConsumeData = isReadyToConsumeData;
    _segmentVersion = indexLoadingConfig.getSegmentVersion();
    _instanceId = _realtimeTableDataManager.getInstanceId();
    _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
    CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
    _segmentCompletionMode = completionConfig != null
        && CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())
        ? CompletionMode.DOWNLOAD : CompletionMode.DEFAULT;
    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
    _trackFilteredMessageOffsets = ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null
        && ingestionConfig.getStreamIngestionConfig().isTrackFilteredMessageOffsets();
    _parallelSegmentConsumptionPolicy = getParallelConsumptionPolicy();

    String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
    // TODO Validate configs
    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
    _partitionGroupId = llcSegmentName.getPartitionGroupId();
    _streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
    _streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMaps(_tableConfig)
        .get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId)));
    _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
    _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory();
    String streamTopic = _streamConfig.getTopicName();
    _segmentNameStr = _segmentZKMetadata.getSegmentName();
    _partitionGroupConsumptionStatus =
        new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(),
            _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()),
            _segmentZKMetadata.getEndOffset() == null ? null
                : _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
            _segmentZKMetadata.getStatus().toString());
    InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig();
    String clientIdSuffix =
        instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
    if (StringUtils.isNotBlank(clientIdSuffix)) {
      _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix;
    } else {
      _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId;
    }
    _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
    _tableStreamName = _tableNameWithType + "_" + streamTopic;
    if (indexLoadingConfig.isRealtimeOffHeapAllocation() && !indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
      _memoryManager =
          new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics);
    } else {
      // For on-heap allocation, we still need a memory manager for forward index.
      // Dictionary will be allocated on heap.
      _memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics);
    }

    _partitionRateLimiter = RealtimeConsumptionRateManager.getInstance()
        .createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId);
    _serverRateLimiter = RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();

    // Read the max number of rows
    int segmentMaxRowCount = segmentZKMetadata.getSizeThresholdToFlushSegment();
    if (segmentMaxRowCount <= 0) {
      segmentMaxRowCount = _streamConfig.getFlushThresholdRows();
    }
    if (segmentMaxRowCount <= 0) {
      segmentMaxRowCount = StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS;
    }
    _segmentMaxRowCount = segmentMaxRowCount;

    _isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
    _defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

    // Start new realtime segment
    String consumerDir = realtimeTableDataManager.getConsumerDir();
    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
        .setTableNameWithType(_tableNameWithType)
        .setSegmentName(_segmentNameStr)
        .setStreamName(streamTopic)
        .setSchema(_schema)
        .setTimeColumnName(timeColumnName)
        .setCapacity(_segmentMaxRowCount)
        .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
        .setSegmentZKMetadata(segmentZKMetadata)
        .setOffHeap(_isOffHeap)
        .setMemoryManager(_memoryManager)
        .setStatsHistory(realtimeTableDataManager.getStatsHistory())
        .setAggregateMetrics(indexingConfig.isAggregateMetrics())
        .setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
        .setDefaultNullHandlingEnabled(_defaultNullHandlingEnabled)
        .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
        .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
        .setConsumerDir(consumerDir);

    // Create message decoder
    Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema);
    RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f);
    AtomicReference<StreamDataDecoder> localStreamDataDecoder = new AtomicReference<>();
    try {
      retryPolicy.attempt(() -> {
        try {
          StreamMessageDecoder streamMessageDecoder = createMessageDecoder(fieldsToRead);
          localStreamDataDecoder.set(new StreamDataDecoderImpl(streamMessageDecoder));
          return true;
        } catch (Exception e) {
          _segmentLogger.warn("Failed to initialize the StreamMessageDecoder: ", e);
          return false;
        }
      });
    } catch (Exception e) {
      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
          new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
      throw e;
    }
    _streamDataDecoder = localStreamDataDecoder.get();

    try {
      _transformPipeline = new TransformPipeline(tableConfig, schema);
    } catch (Exception e) {
      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
          new SegmentErrorInfo(now(), "Failed to initialize the TransformPipeline", e));
      throw e;
    }

    try {
      _startOffset = _partitionGroupConsumptionStatus.getStartOffset();
      _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
      makeStreamConsumer("Starting");
      createPartitionMetadataProvider("Starting");
      setPartitionParameters(realtimeSegmentConfigBuilder, indexingConfig.getSegmentPartitionConfig());
      _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
      _resourceTmpDir = new File(resourceDataDir, RESOURCE_TEMP_DIR_NAME);
      if (!_resourceTmpDir.exists()) {
        _resourceTmpDir.mkdirs();
      }
      _state = State.INITIAL_CONSUMING;
      _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
      setConsumeEndTime(segmentZKMetadata, now());
      _segmentCommitterFactory =
          new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
    } catch (Throwable t) {
      // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from
      // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
      // Hence releasing the semaphore here to unblock reset operation via Helix Admin.
      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
          "Failed to initialize segment data manager", t));
      _segmentLogger.warn(
          "Scheduling task to call controller to mark the segment as OFFLINE in Ideal State due"
           + " to initialization error: '{}'",
          t.getMessage());
      // Since we are going to throw exception from this thread (helix execution thread), the externalview
      // entry for this segment will be ERROR. We allow time for Helix to make this transition, and then
      // invoke controller API mark it OFFLINE in the idealstate.
      new Thread(() -> {
        ConsumptionStopIndicator indicator = new ConsumptionStopIndicator(_currentOffset, _segmentNameStr, _instanceId,
            _protocolHandler, "Consuming segment initialization error", _segmentLogger);
        try {
          // Allow 30s for Helix to mark currentstate and externalview to ERROR, because
          // we are about to receive an ERROR->OFFLINE state transition once we call
          // postSegmentStoppedConsuming() method.
          Thread.sleep(30_000);
          indicator.postSegmentStoppedConsuming();
        } catch (InterruptedException ie) {
          // We got interrupted trying to post stop-consumed message. Give up at this point
          return;
        }
      }).start();
      throw t;
    }
  }