in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java [1455:1655]
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConfig> streamConfigs,
IdealState idealState, List<PartitionGroupMetadata> partitionGroupMetadataList, OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
int numPartitions = partitionGroupMetadataList.size();
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
StreamPartitionMsgOffsetFactory offsetFactory =
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
// Get the latest segment ZK metadata for each partition
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);
// Create a map from partition id to start offset
// TODO: Directly return map from StreamMetadataProvider
Map<Integer, StreamPartitionMsgOffset> partitionIdToStartOffset = Maps.newHashMapWithExpectedSize(numPartitions);
for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
partitionIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
}
// Create a map from partition id to the smallest stream offset
Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
if (offsetCriteria != null && offsetCriteria.equals(OffsetCriteria.SMALLEST_OFFSET_CRITERIA)) {
partitionIdToSmallestOffset = partitionIdToStartOffset;
}
// Walk over all partitions that we have metadata for, and repair any partitions necessary.
// Possible things to repair:
// 1. The latest metadata is in DONE/COMMITTING state, but the idealstate says segment is CONSUMING:
// a. Create metadata for next segment and find hosts to assign it to.
// b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupMetadata)
// c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in
// newPartitionGroupMetadata)
// 2. The latest metadata is in DONE state, but the idealstate has no segment in CONSUMING state.
// a. Create metadata for new IN_PROGRESS segment with startOffset set to latest segments' end offset.
// b. Add the newly created segment to idealstate with segment state set to CONSUMING.
// 3. The latest metadata is IN_PROGRESS, but segment is not there in idealstate.
// a. change prev segment to ONLINE in idealstate
// b. add latest segment to CONSUMING in idealstate.
// 4. All instances of a segment are in OFFLINE state.
// a. Create a new segment (with the next seq number)
// and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does
// not have the same offset).
// In latter case, report data loss.
long currentTimeMs = getCurrentTimeMs();
// This is the expected segment status after completion of first of the 3 steps of the segment commit protocol
// The status in step one is updated to
// 1. DONE for normal consumption
// 2. COMMITTING for pauseless consumption
Status statusPostSegmentMetadataUpdate =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig) ? Status.COMMITTING : Status.DONE;
for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
int partitionId = entry.getKey();
SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);
Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == statusPostSegmentMetadataUpdate) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE/COMMITTING)
// but step-2 is not done (i.e. adding new metadata for the next segment)
// and ideal state update (i.e. marking old segment as ONLINE and new segment as CONSUMING) is not done
// either.
if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
continue;
}
if (partitionIdToStartOffset.containsKey(partitionId)) {
LOGGER.info("Repairing segment: {} which is {} in segment ZK metadata, but is CONSUMING in IdealState",
latestSegmentName, statusPostSegmentMetadataUpdate);
LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentName,
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegmentName, currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
segmentAssignment, instancePartitionsMap);
} else { // partition group reached end of life
LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+ "Skipping creation of new ZK metadata and new segment in ideal state", partitionId,
latestSegmentName);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
instancePartitionsMap);
}
}
// else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
} else {
// No replica in CONSUMING state
// Possible scenarios: for any of these scenarios, we need to create a new CONSUMING segment unless the stream
// partition has reached end of life
// 1. All replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment marked itself OFFLINE during consumption
// for some reason
// 2. All replicas ONLINE and metadata DONE/UPLOADED/COMMITTING
// 3. We should never end up with some replicas ONLINE and some OFFLINE
boolean allInstancesOffline = isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE);
boolean allInstancesOnlineAndMetadataNotInProgress =
isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE) && (latestSegmentZKMetadata.getStatus()
!= Status.IN_PROGRESS);
if (!allInstancesOffline && !allInstancesOnlineAndMetadataNotInProgress) {
LOGGER.error("Got unexpected instance state map: {} for segment: {} with status: {}", instanceStateMap,
latestSegmentName, latestSegmentZKMetadata.getStatus());
continue;
}
// Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset.
if (partitionIdToSmallestOffset == null) {
partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
}
// Do not create new CONSUMING segment when the stream partition has reached end of life.
if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
LOGGER.info("PartitionGroup: {} has reached end of life. Skipping creation of new segment {}",
partitionId, latestSegmentName);
continue;
}
if (allInstancesOffline) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
StreamPartitionMsgOffset startOffset =
selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset,
tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs,
partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
} else {
LOGGER.info("Resuming consumption for partition: {} of table: {}", partitionId, realtimeTableName);
StreamPartitionMsgOffset startOffset =
selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset,
tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs,
partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
}
}
} else {
// idealstate does not have an entry for the segment (but metadata is present)
// controller has failed between step-2 and step-3 of commitSegmentMetadata.
// i.e. after updating old segment metadata (old segment metadata state = DONE/COMMITTING)
// and creating new segment metadata (new segment metadata state = IN_PROGRESS),
// but before updating ideal state (new segment ideal missing from ideal state)
if (!isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) {
continue;
}
LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState",
latestSegmentName);
if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
// Find the previous CONSUMING segment
String previousConsumingSegment = null;
for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
if (segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
&& new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionId) {
previousConsumingSegment = segmentEntry.getKey();
break;
}
}
if (previousConsumingSegment == null) {
LOGGER.error(
"Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss",
partitionId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
}
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
segmentAssignment, instancePartitionsMap);
} else {
LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}",
latestSegmentZKMetadata.getStatus(), latestSegmentName);
}
}
}
// Set up new partitions if not exist
for (PartitionGroupMetadata partitionGroupMetadata : partitionGroupMetadataList) {
int partitionId = partitionGroupMetadata.getPartitionGroupId();
if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs,
instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
}
return idealState;
}