in pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java [212:458]
private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
ServerQueryInfoFetcher serverQueryInfoFetcher = new ServerQueryInfoFetcher(_pinotHelixResourceManager);
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
if (idealState == null) {
LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
removeMetricsForTable(tableNameWithType);
return;
}
if (!idealState.isEnabled()) {
if (context._logDisabledTables) {
LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
}
removeMetricsForTable(tableNameWithType);
context._disabledTables.add(tableNameWithType);
return;
}
if (PinotLLCRealtimeSegmentManager.isTablePaused(idealState)) {
context._pausedTables.add(tableNameWithType);
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
idealState.toString().length());
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
idealState.serialize(RECORD_SERIALIZER).length);
Set<String> segmentsIncludingReplaced = idealState.getPartitionSet();
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
segmentsIncludingReplaced.size());
// Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot
// be queried from the table.
ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
if (propertyStore != null) {
String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType);
List<String> segmentNames = propertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT);
long segmentNamesBytesSize = 0;
if (segmentNames != null) {
for (String segmentName : segmentNames) {
segmentNamesBytesSize += segmentName.getBytes().length;
}
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PROPERTYSTORE_SEGMENT_CHILDREN_BYTE_SIZE,
segmentNamesBytesSize);
}
Set<String> segments;
if (segmentsIncludingReplaced.isEmpty()) {
segments = Set.of();
} else {
segments = new HashSet<>(segmentsIncludingReplaced);
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments, segmentLineage);
}
int numSegments = segments.size();
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT, numSegments);
if (numSegments == 0) {
int numReplicasFromIS;
try {
numReplicasFromIS = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
numReplicasFromIS = 1;
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, 0);
return;
}
ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
if (externalView != null) {
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.EXTERNALVIEW_ZNODE_SIZE,
externalView.toString().length());
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.EXTERNALVIEW_ZNODE_BYTE_SIZE,
externalView.serialize(RECORD_SERIALIZER).length);
}
// Maximum number of replicas that is up (ONLINE/CONSUMING) in ideal state
int maxISReplicasUp = Integer.MIN_VALUE;
// Minimum number of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUp = Integer.MAX_VALUE;
// Minimum percentage of replicas that is up (ONLINE/CONSUMING) in external view
int minEVReplicasUpPercent = 100;
// Total compressed segment size in deep store
long tableCompressedSize = 0;
// Segments without ZK metadata
List<String> segmentsWithoutZKMetadata = new ArrayList<>();
// Pairs of segment-instance in ERROR state
List<Pair<String, String>> errorSegments = new ArrayList<>();
// Offline segments
List<String> offlineSegments = new ArrayList<>();
// Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state
List<String> partialOnlineSegments = new ArrayList<>();
List<String> segmentsInvalidStartTime = new ArrayList<>();
List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
// Number of replicas in ideal state that is in ONLINE/CONSUMING state
int numISReplicasUp = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numISReplicasUp++;
}
}
// Skip segments with no ONLINE/CONSUMING in ideal state
if (numISReplicasUp == 0) {
continue;
}
maxISReplicasUp = Math.max(maxISReplicasUp, numISReplicasUp);
SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
if (segmentZKMetadata == null) {
segmentsWithoutZKMetadata.add(segment);
continue;
}
long sizeInBytes = segmentZKMetadata.getSizeInBytes();
if (sizeInBytes > 0) {
tableCompressedSize += sizeInBytes;
}
// NOTE: We want to skip segments that are just created/pushed to avoid false alerts because it is expected for
// servers to take some time to load them. For consuming (IN_PROGRESS) segments, we use creation time from
// the ZK metadata; for pushed segments, we use push time from the ZK metadata. Both of them are the time
// when segment is newly created. For committed segments from real-time table, push time doesn't exist, and
// creationTimeMs will be Long.MIN_VALUE, which is fine because we want to include them in the check.
long creationTimeMs = segmentZKMetadata.getStatus() == Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime()
: segmentZKMetadata.getPushTime();
if (creationTimeMs > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000L) {
continue;
}
if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) {
segmentsInvalidStartTime.add(segment);
}
if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) {
segmentsInvalidEndTime.add(segment);
}
}
int numEVReplicasUp = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String serverInstanceId = entry.getKey();
String segmentState = entry.getValue();
if ((segmentState.equals(SegmentStateModel.ONLINE) || segmentState.equals(SegmentStateModel.CONSUMING))
&& isServerQueryable(serverQueryInfoFetcher.getServerQueryInfo(serverInstanceId))) {
numEVReplicasUp++;
}
if (segmentState.equals(SegmentStateModel.ERROR)) {
errorSegments.add(Pair.of(segment, entry.getKey()));
}
}
}
}
if (numEVReplicasUp == 0) {
offlineSegments.add(segment);
} else if (numEVReplicasUp < numISReplicasUp) {
partialOnlineSegments.add(segment);
} else {
// Do not allow numEVReplicasUp to be larger than numISReplicasUp
numEVReplicasUp = numISReplicasUp;
}
minEVReplicasUp = Math.min(minEVReplicasUp, numEVReplicasUp);
// Total number of replicas in ideal state (including ERROR/OFFLINE states)
int numISReplicasTotal = Math.max(idealState.getInstanceStateMap(segment).entrySet().size(), 1);
minEVReplicasUpPercent = Math.min(minEVReplicasUpPercent, numEVReplicasUp * 100 / numISReplicasTotal);
}
if (maxISReplicasUp == Integer.MIN_VALUE) {
try {
maxISReplicasUp = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
maxISReplicasUp = 1;
}
}
// Do not allow minEVReplicasUp to be larger than maxISReplicasUp
minEVReplicasUp = Math.min(minEVReplicasUp, maxISReplicasUp);
int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
if (numSegmentsWithoutZKMetadata > 0) {
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
logSegments(segmentsWithoutZKMetadata));
}
int numErrorSegments = errorSegments.size();
if (numErrorSegments > 0) {
LOGGER.warn("Table {} has {} segments in ERROR state: {}", tableNameWithType, numErrorSegments,
logSegments(errorSegments));
}
int numOfflineSegments = offlineSegments.size();
if (numOfflineSegments > 0) {
LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica: {}", tableNameWithType,
numOfflineSegments, logSegments(offlineSegments));
}
int numPartialOnlineSegments = partialOnlineSegments.size();
if (numPartialOnlineSegments > 0) {
LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType,
numPartialOnlineSegments, logSegments(partialOnlineSegments));
}
int numInvalidStartTime = segmentsInvalidStartTime.size();
if (numInvalidStartTime > 0) {
LOGGER.warn("Table {} has {} segments with invalid start time: {}", tableNameWithType, numInvalidStartTime,
logSegments(segmentsInvalidStartTime));
}
int numInvalidEndTime = segmentsInvalidEndTime.size();
if (numInvalidEndTime > 0) {
LOGGER.warn("Table {} has {} segments with invalid end time: {}", tableNameWithType, numInvalidEndTime,
logSegments(segmentsInvalidEndTime));
}
// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicasUp);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
minEVReplicasUpPercent);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L / numSegments : 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
numPartialOnlineSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME,
numInvalidStartTime);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME,
numInvalidEndTime);
if (tableType == TableType.REALTIME && tableConfig != null) {
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
streamConfigs).findAndEmitMetrics(idealState);
}
}