in pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java [154:493]
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String taskType = MergeRollupTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
for (TableConfig tableConfig : tableConfigs) {
if (!validate(tableConfig, taskType)) {
continue;
}
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType);
// Get all segment metadata
List<SegmentZKMetadata> allSegments =
tableConfig.getTableType() == TableType.OFFLINE
? getSegmentsZKMetadataForTable(tableNameWithType)
: filterSegmentsforRealtimeTable(
getNonConsumingSegmentsZKMetadataForRealtimeTable(tableNameWithType));
// Select current segment snapshot based on lineage, filter out empty segments
SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(tableNameWithType);
Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
for (SegmentZKMetadata segment : allSegments) {
preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
}
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
for (SegmentZKMetadata segment : allSegments) {
if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0
&& MergeTaskUtils.allowMerge(segment)) {
preSelectedSegments.add(segment);
}
}
if (preSelectedSegments.isEmpty()) {
// Reset the watermark time if no segment found. This covers the case where the table is newly created or
// all segments for the existing table got deleted.
resetDelayMetrics(tableNameWithType);
LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, tableNameWithType);
continue;
}
// Sort segments based on startTimeMs, endTimeMs and segmentName in ascending order
preSelectedSegments.sort((a, b) -> {
long aStartTime = a.getStartTimeMs();
long bStartTime = b.getStartTimeMs();
if (aStartTime != bStartTime) {
return Long.compare(aStartTime, bStartTime);
}
long aEndTime = a.getEndTimeMs();
long bEndTime = b.getEndTimeMs();
return aEndTime != bEndTime ? Long.compare(aEndTime, bEndTime)
: a.getSegmentName().compareTo(b.getSegmentName());
});
// Sort merge levels based on bucket time period
Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
Map<String, Map<String, String>> mergeLevelToConfigs = MergeRollupTaskUtils.getLevelToConfigMap(taskConfigs);
List<Map.Entry<String, Map<String, String>>> sortedMergeLevelConfigs =
new ArrayList<>(mergeLevelToConfigs.entrySet());
sortedMergeLevelConfigs.sort(Comparator.comparingLong(
e -> TimeUtils.convertPeriodToMillis(e.getValue().get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY))));
// Get incomplete merge levels
Set<String> inCompleteMergeLevels = new HashSet<>();
for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
_clusterInfoAccessor).entrySet()) {
for (PinotTaskConfig taskConfig : _clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
}
}
// Get scheduling mode which is "processFromWatermark" by default. If "processAll" mode is enabled, there will be
// no watermark, and each round we pick the buckets in chronological order which have unmerged segments.
boolean processAll = MergeTask.PROCESS_ALL_MODE.equalsIgnoreCase(taskConfigs.get(MergeTask.MODE));
ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
int expectedVersion = mergeRollupTaskZNRecord != null ? mergeRollupTaskZNRecord.getVersion() : -1;
MergeRollupTaskMetadata mergeRollupTaskMetadata =
mergeRollupTaskZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(mergeRollupTaskZNRecord)
: new MergeRollupTaskMetadata(tableNameWithType, new TreeMap<>());
List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
// Schedule tasks from lowest to highest merge level (e.g. Hourly -> Daily -> Monthly -> Yearly)
String mergeLevel = null;
for (Map.Entry<String, Map<String, String>> mergeLevelConfig : sortedMergeLevelConfigs) {
String lowerMergeLevel = mergeLevel;
mergeLevel = mergeLevelConfig.getKey();
Map<String, String> mergeConfigs = mergeLevelConfig.getValue();
// Skip scheduling if there's incomplete task for current mergeLevel
if (inCompleteMergeLevels.contains(mergeLevel)) {
LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}",
mergeLevel, tableNameWithType, taskType);
continue;
}
// Get the bucket size, buffer size and maximum number of parallel buckets (by default 1)
String bucketPeriod = mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY);
long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
if (bucketMs <= 0) {
LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", bucketPeriod,
tableNameWithType, mergeLevel);
continue;
}
String bufferPeriod = mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
if (bufferMs < 0) {
LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0",
bufferPeriod, tableNameWithType, mergeLevel);
continue;
}
String maxNumParallelBucketsStr = mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
int maxNumParallelBuckets = maxNumParallelBucketsStr != null ? Integer.parseInt(maxNumParallelBucketsStr)
: DEFAULT_NUM_PARALLEL_BUCKETS;
if (maxNumParallelBuckets <= 0) {
LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0",
maxNumParallelBuckets, tableNameWithType, mergeLevel);
continue;
}
// Get bucket start/end time
long preSelectedSegStartTimeMs = preSelectedSegments.get(0).getStartTimeMs();
long bucketStartMs = preSelectedSegStartTimeMs / bucketMs * bucketMs;
long watermarkMs = 0;
if (!processAll) {
// Get watermark from MergeRollupTaskMetadata ZNode
// bucketStartMs = watermarkMs
// bucketEndMs = bucketStartMs + bucketMs
watermarkMs = getWatermarkMs(preSelectedSegStartTimeMs, bucketMs, mergeLevel,
mergeRollupTaskMetadata);
bucketStartMs = watermarkMs;
}
long bucketEndMs = bucketStartMs + bucketMs;
if (lowerMergeLevel == null) {
long lowestLevelMaxValidBucketEndTimeMs = Long.MIN_VALUE;
for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
// Compute lowestLevelMaxValidBucketEndTimeMs among segments that are ready for merge
long currentValidBucketEndTimeMs =
getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
lowestLevelMaxValidBucketEndTimeMs =
Math.max(lowestLevelMaxValidBucketEndTimeMs, currentValidBucketEndTimeMs);
}
_tableLowestLevelMaxValidBucketEndTimeMs.put(tableNameWithType, lowestLevelMaxValidBucketEndTimeMs);
}
// Create metrics even if there's no task scheduled, this helps the case that the controller is restarted
// but the metrics are not available until the controller schedules a valid task
List<String> sortedMergeLevels =
sortedMergeLevelConfigs.stream().map(e -> e.getKey()).collect(Collectors.toList());
if (processAll) {
createOrUpdateNumBucketsToProcessMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, bufferMs, bucketMs,
preSelectedSegments, sortedMergeLevels);
} else {
createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
}
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata, processAll)) {
LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}, mode : {}) cannot be merged yet",
bucketStartMs, bucketEndMs, tableNameWithType, mergeLevel, processAll ? MergeTask.PROCESS_ALL_MODE
: MergeTask.PROCESS_FROM_WATERMARK_MODE);
continue;
}
// Find overlapping segments for each bucket, skip the buckets that has all segments merged
List<List<SegmentZKMetadata>> selectedSegmentsForAllBuckets = new ArrayList<>(maxNumParallelBuckets);
List<SegmentZKMetadata> selectedSegmentsForBucket = new ArrayList<>();
boolean hasUnmergedSegments = false;
boolean hasSpilledOverData = false;
boolean areAllSegmentsReadyToMerge = true;
// The for loop terminates in following cases:
// 1. Found buckets with unmerged segments:
// For each bucket find all segments overlapping with the target bucket, skip the bucket if all overlapping
// segments are merged. Schedule k (numParallelBuckets) buckets at most, and stops at the first bucket that
// contains spilled over data.
// One may wonder how a segment with records spanning different buckets is handled. The short answer is that
// it will be cut into multiple segments, each for a separate bucket. This is achieved by setting bucket time
// period as PARTITION_BUCKET_TIME_PERIOD when generating PinotTaskConfigs
// 2. There's no bucket with unmerged segments, skip scheduling
for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
long startTimeMs = preSelectedSegment.getStartTimeMs();
if (startTimeMs < bucketEndMs) {
long endTimeMs = preSelectedSegment.getEndTimeMs();
if (endTimeMs >= bucketStartMs) {
// For segments overlapping with current bucket, add to the result list
if (!isMergedSegment(preSelectedSegment, mergeLevel, sortedMergeLevels)) {
hasUnmergedSegments = true;
}
if (!isMergedSegment(preSelectedSegment, lowerMergeLevel, sortedMergeLevels)) {
areAllSegmentsReadyToMerge = false;
}
if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
hasSpilledOverData = true;
}
selectedSegmentsForBucket.add(preSelectedSegment);
}
// endTimeMs < bucketStartMs
// Haven't find the first overlapping segment, continue to the next segment
} else {
// Has gone through all overlapping segments for current bucket
if (hasUnmergedSegments && areAllSegmentsReadyToMerge) {
// Add the bucket if there are unmerged segments
selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
}
if (selectedSegmentsForAllBuckets.size() == maxNumParallelBuckets || hasSpilledOverData) {
// If there are enough buckets or found spilled over data, schedule merge tasks
break;
} else {
// Start with a new bucket
// TODO: If there are many small merged segments, we should merge them again
selectedSegmentsForBucket = new ArrayList<>();
hasUnmergedSegments = false;
areAllSegmentsReadyToMerge = true;
bucketStartMs = (startTimeMs / bucketMs) * bucketMs;
bucketEndMs = bucketStartMs + bucketMs;
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata, processAll)) {
break;
}
if (!isMergedSegment(preSelectedSegment, mergeLevel, sortedMergeLevels)) {
hasUnmergedSegments = true;
}
if (!isMergedSegment(preSelectedSegment, lowerMergeLevel, sortedMergeLevels)) {
areAllSegmentsReadyToMerge = false;
}
if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
hasSpilledOverData = true;
}
selectedSegmentsForBucket.add(preSelectedSegment);
}
}
}
// Add the last bucket if it contains unmerged segments and is not added before
if (hasUnmergedSegments && areAllSegmentsReadyToMerge && (selectedSegmentsForAllBuckets.isEmpty() || (
selectedSegmentsForAllBuckets.get(selectedSegmentsForAllBuckets.size() - 1)
!= selectedSegmentsForBucket))) {
selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
}
if (selectedSegmentsForAllBuckets.isEmpty()) {
LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableNameWithType, mergeLevel);
continue;
}
// Bump up watermark to the earliest start time of selected segments truncated to the closest bucket boundary
long newWatermarkMs = selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs * bucketMs;
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", tableNameWithType, mergeLevel,
watermarkMs, newWatermarkMs);
// Update the delay metrics
if (!processAll) {
createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs,
bucketMs);
}
// Create task configs
int maxNumRecordsPerTask =
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null ? Integer.parseInt(
mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY)) : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig == null) {
for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(selectedSegmentsPerBucket, tableConfig, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
} else {
// For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from
// partitions of all partition columns. There should be exact match between partition columns of segment and
// partition columns of table configuration, and there is only partition per column in segment metadata).
// Other segments which do not meet these conditions are considered as outlier segments, and additional tasks
// are generated for them.
Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
List<String> partitionColumns = new ArrayList<>(columnPartitionMap.keySet());
for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
for (SegmentZKMetadata selectedSegment : selectedSegmentsPerBucket) {
SegmentPartitionMetadata segmentPartitionMetadata = selectedSegment.getPartitionMetadata();
List<Integer> partitions = new ArrayList<>();
if (segmentPartitionMetadata != null && columnPartitionMap.keySet()
.equals(segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
for (String partitionColumn : partitionColumns) {
if (segmentPartitionMetadata.getPartitions(partitionColumn).size() == 1) {
partitions.add(segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next());
} else {
partitions.clear();
break;
}
}
}
if (partitions.isEmpty()) {
outlierSegments.add(selectedSegment);
} else {
partitionToSegments.computeIfAbsent(partitions, k -> new ArrayList<>()).add(selectedSegment);
}
}
for (Map.Entry<List<Integer>, List<SegmentZKMetadata>> entry : partitionToSegments.entrySet()) {
List<Integer> partition = entry.getKey();
List<SegmentZKMetadata> partitionedSegments = entry.getValue();
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(partitionedSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
partition, mergeConfigs, taskConfigs));
}
if (!outlierSegments.isEmpty()) {
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(outlierSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
}
}
}
// Write updated watermark map to zookeeper
if (!processAll) {
try {
_clusterInfoAccessor
.setMinionTaskMetadata(mergeRollupTaskMetadata, MinionConstants.MergeRollupTask.TASK_TYPE,
expectedVersion);
} catch (ZkException e) {
LOGGER.error(
"Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are "
+ "multiple task schedulers for the same table, need to investigate!", tableNameWithType);
continue;
}
}
pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", tableNameWithType,
taskType, pinotTaskConfigsForTable.size());
}
// Clean up metrics
cleanUpDelayMetrics(tableConfigs);
return pinotTaskConfigs;
}