private void updateSegmentMetrics()

in pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java [212:458]


  private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);

    ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager);

    IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);

    if (idealState == null) {
      LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
      removeMetricsForTable(tableNameWithType);
      return;
    }

    if (!idealState.isEnabled()) {
      if (context._logDisabledTables) {
        LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
      }
      removeMetricsForTable(tableNameWithType);
      context._disabledTables.add(tableNameWithType);
      return;
    }

    if (PinotLLCRealtimeSegmentManager.isTablePaused(idealState)) {
      context._pausedTables.add(tableNameWithType);
    }

    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
        idealState.toString().length());
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
        idealState.serialize(RECORD_SERIALIZER).length);

    Set<String> segmentsIncludingReplaced = idealState.getPartitionSet();
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
        segmentsIncludingReplaced.size());
    // Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot
    // be queried from the table.
    ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();

    if (propertyStore != null) {
      String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType);
      List<String> segmentNames = propertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT);
      long segmentNamesBytesSize = 0;
      if (segmentNames != null) {
        for (String segmentName : segmentNames) {
          segmentNamesBytesSize += segmentName.getBytes().length;
        }
      }
      _controllerMetrics.setValueOfTableGauge(tableNameWithType,
          ControllerGauge.PROPERTYSTORE_SEGMENT_CHILDREN_BYTE_SIZE,
          segmentNamesBytesSize);
    }

    Set<String> segments;
    if (segmentsIncludingReplaced.isEmpty()) {
      segments = Set.of();
    } else {
      segments = new HashSet<>(segmentsIncludingReplaced);
      SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
      SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments, segmentLineage);
    }
    int numSegments = segments.size();
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT, numSegments);
    if (numSegments == 0) {
      int numReplicasFromIS;
      try {
        numReplicasFromIS = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
      } catch (NumberFormatException e) {
        numReplicasFromIS = 1;
      }
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS);
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0);
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, 0);
      return;
    }

    ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
    if (externalView != null) {
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.EXTERNALVIEW_ZNODE_SIZE,
          externalView.toString().length());
      _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.EXTERNALVIEW_ZNODE_BYTE_SIZE,
          externalView.serialize(RECORD_SERIALIZER).length);
    }

    // Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state
    int maxISReplicasUp = Integer.MIN_VALUE;
    // Minimum number of replicas that is up (ONLINE/CONSUMING) in external view
    int minEVReplicasUp = Integer.MAX_VALUE;
    // Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view
    int minEVReplicasUpPercent = 100;
    // Total compressed segment size in deep store
    long tableCompressedSize = 0;
    // Segments without ZK metadata
    List<String> segmentsWithoutZKMetadata = new ArrayList<>();
    // Pairs of segment-instance in ERROR state
    List<Pair<String, String>> errorSegments = new ArrayList<>();
    // Offline segments
    List<String> offlineSegments = new ArrayList<>();
    // Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state
    List<String> partialOnlineSegments = new ArrayList<>();
    List<String> segmentsInvalidStartTime = new ArrayList<>();
    List<String> segmentsInvalidEndTime = new ArrayList<>();
    for (String segment : segments) {
      // Number of replicas in ideal state that is in ONLINE/CONSUMING state
      int numISReplicasUp = 0;
      for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
        String state = entry.getValue();
        if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
          numISReplicasUp++;
        }
      }
      // Skip segments with no ONLINE/CONSUMING in ideal state
      if (numISReplicasUp == 0) {
        continue;
      }
      maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp);

      SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
      // Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
      if (segmentZKMetadata == null) {
        segmentsWithoutZKMetadata.add(segment);
        continue;
      }
      long sizeInBytes = segmentZKMetadata.getSizeInBytes();
      if (sizeInBytes > 0) {
        tableCompressedSize += sizeInBytes;
      }

      // NOTE: We want to skip segments that are just created/pushed to avoid false alerts because it is expected for
      //       servers to take some time to load them. For consuming (IN_PROGRESS) segments, we use creation time from
      //       the ZK metadata; for pushed segments, we use push time from the ZK metadata. Both of them are the time
      //       when segment is newly created. For committed segments from real-time table, push time doesn't exist, and
      //       creationTimeMs will be Long.MIN_VALUE, which is fine because we want to include them in the check.
      long creationTimeMs = segmentZKMetadata.getStatus() == Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime()
          : segmentZKMetadata.getPushTime();
      if (creationTimeMs > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000L) {
        continue;
      }

      if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
        if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) {
          segmentsInvalidStartTime.add(segment);
        }
        if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) {
          segmentsInvalidEndTime.add(segment);
        }
      }

      int numEVReplicasUp = 0;
      if (externalView != null) {
        Map<String, String> stateMap = externalView.getStateMap(segment);
        if (stateMap != null) {
          for (Map.Entry<String, String> entry : stateMap.entrySet()) {
            String serverInstanceId = entry.getKey();
            String segmentState = entry.getValue();
            if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))
                && isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) {
              numEVReplicasUp++;
            }
            if (segmentState.equals(SegmentStateModel.ERROR)) {
              errorSegments.add(Pair.of(segment, entry.getKey()));
            }
          }
        }
      }
      if (numEVReplicasUp == 0) {
        offlineSegments.add(segment);
      } else if (numEVReplicasUp < numISReplicasUp) {
        partialOnlineSegments.add(segment);
      } else {
        // Do not allow numEVReplicasUp to be larger than numISReplicasUp
        numEVReplicasUp = numISReplicasUp;
      }

      minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp);
      // Total number of replicas in ideal state (including ERROR/OFFLINE states)
      int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1);
      minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal);
    }

    if (maxISReplicasUp == Integer.MIN_VALUE) {
      try {
        maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
      } catch (NumberFormatException e) {
        maxISReplicasUp = 1;
      }
    }

    // Do not allow minEVReplicasUp to be larger than maxISReplicasUp
    minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp);

    int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
    if (numSegmentsWithoutZKMetadata > 0) {
      LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
          logSegments(segmentsWithoutZKMetadata));
    }
    int numErrorSegments = errorSegments.size();
    if (numErrorSegments > 0) {
      LOGGER.warn("Table {} has {} segments in ERROR state: {}", tableNameWithType, numErrorSegments,
          logSegments(errorSegments));
    }
    int numOfflineSegments = offlineSegments.size();
    if (numOfflineSegments > 0) {
      LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica: {}", tableNameWithType,
          numOfflineSegments, logSegments(offlineSegments));
    }
    int numPartialOnlineSegments = partialOnlineSegments.size();
    if (numPartialOnlineSegments > 0) {
      LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType,
          numPartialOnlineSegments, logSegments(partialOnlineSegments));
    }
    int numInvalidStartTime = segmentsInvalidStartTime.size();
    if (numInvalidStartTime > 0) {
      LOGGER.warn("Table {} has {} segments with invalid start time: {}", tableNameWithType, numInvalidStartTime,
          logSegments(segmentsInvalidStartTime));
    }
    int numInvalidEndTime = segmentsInvalidEndTime.size();
    if (numInvalidEndTime > 0) {
      LOGGER.warn("Table {} has {} segments with invalid end time: {}", tableNameWithType, numInvalidEndTime,
          logSegments(segmentsInvalidEndTime));
    }

    // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
        minEVReplicasUpPercent);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
        numErrorSegments);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
        numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L / numSegments : 100);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
        numPartialOnlineSegments);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
        tableCompressedSize);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME,
        numInvalidStartTime);
    _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME,
        numInvalidEndTime);

    if (tableType == TableType.REALTIME && tableConfig != null) {
      List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
      new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
          streamConfigs).findAndEmitMetrics(idealState);
    }
  }