IdealState ensureAllPartitionsConsuming()

in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java [1455:1655]


  IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConfig> streamConfigs,
      IdealState idealState, List<PartitionGroupMetadata> partitionGroupMetadataList, OffsetCriteria offsetCriteria) {
    String realtimeTableName = tableConfig.getTableName();

    InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
    int numPartitions = partitionGroupMetadataList.size();

    SegmentAssignment segmentAssignment =
        SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
        Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);

    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
    StreamPartitionMsgOffsetFactory offsetFactory =
        StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();

    // Get the latest segment ZK metadata for each partition
    Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);

    // Create a map from partition id to start offset
    // TODO: Directly return map from StreamMetadataProvider
    Map<Integer, StreamPartitionMsgOffset> partitionIdToStartOffset = Maps.newHashMapWithExpectedSize(numPartitions);
    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
      partitionIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
    }
    // Create a map from partition id to the smallest stream offset
    Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
    if (offsetCriteria != null && offsetCriteria.equals(OffsetCriteria.SMALLEST_OFFSET_CRITERIA)) {
      partitionIdToSmallestOffset = partitionIdToStartOffset;
    }

    // Walk over all partitions that we have metadata for, and repair any partitions necessary.
    // Possible things to repair:
    // 1. The latest metadata is in DONE/COMMITTING state, but the idealstate says segment is CONSUMING:
    //    a. Create metadata for next segment and find hosts to assign it to.
    //    b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupMetadata)
    //    c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in
    //    newPartitionGroupMetadata)
    // 2. The latest metadata is in DONE state, but the idealstate has no segment in CONSUMING state.
    //    a. Create metadata for new IN_PROGRESS segment with startOffset set to latest segments' end offset.
    //    b. Add the newly created segment to idealstate with segment state set to CONSUMING.
    // 3. The latest metadata is IN_PROGRESS, but segment is not there in idealstate.
    //    a. change prev segment to ONLINE in idealstate
    //    b. add latest segment to CONSUMING in idealstate.
    // 4. All instances of a segment are in OFFLINE state.
    //    a. Create a new segment (with the next seq number)
    //       and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does
    //       not have the same offset).
    //       In latter case, report data loss.
    long currentTimeMs = getCurrentTimeMs();

    // This is the expected segment status after completion of first of the 3 steps of the segment commit protocol
    // The status in step one is updated to
    // 1. DONE for normal consumption
    // 2. COMMITTING for pauseless consumption
    Status statusPostSegmentMetadataUpdate =
        PauselessConsumptionUtils.isPauselessEnabled(tableConfig) ? Status.COMMITTING : Status.DONE;

    for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
      int partitionId = entry.getKey();
      SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
      String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
      LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);

      Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName);
      if (instanceStateMap != null) {
        // Latest segment of metadata is in idealstate.
        if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
          if (latestSegmentZKMetadata.getStatus() == statusPostSegmentMetadataUpdate) {

            // step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE/COMMITTING)
            // but step-2 is not done (i.e. adding new metadata for the next segment)
            // and ideal state update (i.e. marking old segment as ONLINE and new segment as CONSUMING) is not done
            // either.
            if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
              continue;
            }
            if (partitionIdToStartOffset.containsKey(partitionId)) {
              LOGGER.info("Repairing segment: {} which is {} in segment ZK metadata, but is CONSUMING in IdealState",
                  latestSegmentName, statusPostSegmentMetadataUpdate);

              LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
              String newSegmentName = newLLCSegmentName.getSegmentName();
              CommittingSegmentDescriptor committingSegmentDescriptor =
                  new CommittingSegmentDescriptor(latestSegmentName,
                      (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
              createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegmentName, currentTimeMs,
                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
                  segmentAssignment, instancePartitionsMap);
            } else { // partition group reached end of life
              LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
                      + "Skipping creation of new ZK metadata and new segment in ideal state", partitionId,
                  latestSegmentName);
              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
                  instancePartitionsMap);
            }
          }
          // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
        } else {
          // No replica in CONSUMING state

          // Possible scenarios: for any of these scenarios, we need to create a new CONSUMING segment unless the stream
          // partition has reached end of life
          // 1. All replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment marked itself OFFLINE during consumption
          //    for some reason
          // 2. All replicas ONLINE and metadata DONE/UPLOADED/COMMITTING
          // 3. We should never end up with some replicas ONLINE and some OFFLINE
          boolean allInstancesOffline = isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE);
          boolean allInstancesOnlineAndMetadataNotInProgress =
              isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE) && (latestSegmentZKMetadata.getStatus()
                  != Status.IN_PROGRESS);
          if (!allInstancesOffline && !allInstancesOnlineAndMetadataNotInProgress) {
            LOGGER.error("Got unexpected instance state map: {} for segment: {} with status: {}", instanceStateMap,
                latestSegmentName, latestSegmentZKMetadata.getStatus());
            continue;
          }

          // Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset.
          if (partitionIdToSmallestOffset == null) {
            partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
          }

          // Do not create new CONSUMING segment when the stream partition has reached end of life.
          if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
            LOGGER.info("PartitionGroup: {} has reached end of life. Skipping creation of new segment {}",
                partitionId, latestSegmentName);
            continue;
          }

          if (allInstancesOffline) {
            LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
            StreamPartitionMsgOffset startOffset =
                selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset,
                    tableConfig.getTableName(), offsetFactory,
                    latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
            createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs,
                partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
                instancePartitionsMap, startOffset);
          } else {
            LOGGER.info("Resuming consumption for partition: {} of table: {}", partitionId, realtimeTableName);
            StreamPartitionMsgOffset startOffset =
                selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset,
                    tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset());
            createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs,
                partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
                instancePartitionsMap, startOffset);
          }
        }
      } else {
        // idealstate does not have an entry for the segment (but metadata is present)
        // controller has failed between step-2 and step-3 of commitSegmentMetadata.
        // i.e. after updating old segment metadata (old segment metadata state = DONE/COMMITTING)
        // and creating new segment metadata (new segment metadata state = IN_PROGRESS),
        // but before updating ideal state (new segment ideal missing from ideal state)
        if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
          continue;
        }
        LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState",
            latestSegmentName);

        if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
          // Find the previous CONSUMING segment
          String previousConsumingSegment = null;
          for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
            if (segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
                && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionId) {
              previousConsumingSegment = segmentEntry.getKey();
              break;
            }
          }
          if (previousConsumingSegment == null) {
            LOGGER.error(
                "Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss",
                partitionId, realtimeTableName);
            _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
          }
          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
              segmentAssignment, instancePartitionsMap);
        } else {
          LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}",
              latestSegmentZKMetadata.getStatus(), latestSegmentName);
        }
      }
    }

    // Set up new partitions if not exist
    for (PartitionGroupMetadata partitionGroupMetadata : partitionGroupMetadataList) {
      int partitionId = partitionGroupMetadata.getPartitionGroupId();
      if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
        String newSegmentName =
            setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs,
                instancePartitions, numPartitions, numReplicas);
        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
            instancePartitionsMap);
      }
    }

    return idealState;
  }