public List generateTasks()

in pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java [154:493]


  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
    String taskType = MergeRollupTask.TASK_TYPE;
    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
    for (TableConfig tableConfig : tableConfigs) {
      if (!validate(tableConfig, taskType)) {
        continue;
      }
      String tableNameWithType = tableConfig.getTableName();
      LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType);

      // Get all segment metadata
      List<SegmentZKMetadata> allSegments =
              tableConfig.getTableType() == TableType.OFFLINE
                      ? getSegmentsZKMetadataForTable(tableNameWithType)
                      : filterSegmentsforRealtimeTable(
                              getNonConsumingSegmentsZKMetadataForRealtimeTable(tableNameWithType));

      // Select current segment snapshot based on lineage, filter out empty segments
      SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(tableNameWithType);
      Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
      for (SegmentZKMetadata segment : allSegments) {
        preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
      }
      SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);

      List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
      for (SegmentZKMetadata segment : allSegments) {
        if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0
            && MergeTaskUtils.allowMerge(segment)) {
          preSelectedSegments.add(segment);
        }
      }

      if (preSelectedSegments.isEmpty()) {
        // Reset the watermark time if no segment found. This covers the case where the table is newly created or
        // all segments for the existing table got deleted.
        resetDelayMetrics(tableNameWithType);
        LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, tableNameWithType);
        continue;
      }

      // Sort segments based on startTimeMs, endTimeMs and segmentName in ascending order
      preSelectedSegments.sort((a, b) -> {
        long aStartTime = a.getStartTimeMs();
        long bStartTime = b.getStartTimeMs();
        if (aStartTime != bStartTime) {
          return Long.compare(aStartTime, bStartTime);
        }
        long aEndTime = a.getEndTimeMs();
        long bEndTime = b.getEndTimeMs();
        return aEndTime != bEndTime ? Long.compare(aEndTime, bEndTime)
            : a.getSegmentName().compareTo(b.getSegmentName());
      });

      // Sort merge levels based on bucket time period
      Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
      Map<String, Map<String, String>> mergeLevelToConfigs = MergeRollupTaskUtils.getLevelToConfigMap(taskConfigs);
      List<Map.Entry<String, Map<String, String>>> sortedMergeLevelConfigs =
          new ArrayList<>(mergeLevelToConfigs.entrySet());
      sortedMergeLevelConfigs.sort(Comparator.comparingLong(
          e -> TimeUtils.convertPeriodToMillis(e.getValue().get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY))));

      // Get incomplete merge levels
      Set<String> inCompleteMergeLevels = new HashSet<>();
      for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
          _clusterInfoAccessor).entrySet()) {
        for (PinotTaskConfig taskConfig : _clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
          inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
        }
      }

      // Get scheduling mode which is "processFromWatermark" by default. If "processAll" mode is enabled, there will be
      // no watermark, and each round we pick the buckets in chronological order which have unmerged segments.
      boolean processAll = MergeTask.PROCESS_ALL_MODE.equalsIgnoreCase(taskConfigs.get(MergeTask.MODE));

      ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor
          .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
      int expectedVersion = mergeRollupTaskZNRecord != null ? mergeRollupTaskZNRecord.getVersion() : -1;
      MergeRollupTaskMetadata mergeRollupTaskMetadata =
          mergeRollupTaskZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(mergeRollupTaskZNRecord)
              : new MergeRollupTaskMetadata(tableNameWithType, new TreeMap<>());
      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();

      // Schedule tasks from lowest to highest merge level (e.g. Hourly -> Daily -> Monthly -> Yearly)
      String mergeLevel = null;
      for (Map.Entry<String, Map<String, String>> mergeLevelConfig : sortedMergeLevelConfigs) {
        String lowerMergeLevel = mergeLevel;
        mergeLevel = mergeLevelConfig.getKey();
        Map<String, String> mergeConfigs = mergeLevelConfig.getValue();

        // Skip scheduling if there's incomplete task for current mergeLevel
        if (inCompleteMergeLevels.contains(mergeLevel)) {
          LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}",
              mergeLevel, tableNameWithType, taskType);
          continue;
        }

        // Get the bucket size, buffer size and maximum number of parallel buckets (by default 1)
        String bucketPeriod = mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY);
        long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
        if (bucketMs <= 0) {
          LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", bucketPeriod,
              tableNameWithType, mergeLevel);
          continue;
        }
        String bufferPeriod = mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
        long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
        if (bufferMs < 0) {
          LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0",
              bufferPeriod, tableNameWithType, mergeLevel);
          continue;
        }
        String maxNumParallelBucketsStr = mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
        int maxNumParallelBuckets = maxNumParallelBucketsStr != null ? Integer.parseInt(maxNumParallelBucketsStr)
            : DEFAULT_NUM_PARALLEL_BUCKETS;
        if (maxNumParallelBuckets <= 0) {
          LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0",
              maxNumParallelBuckets, tableNameWithType, mergeLevel);
          continue;
        }

        // Get bucket start/end time
        long preSelectedSegStartTimeMs = preSelectedSegments.get(0).getStartTimeMs();
        long bucketStartMs = preSelectedSegStartTimeMs / bucketMs * bucketMs;
        long watermarkMs = 0;
        if (!processAll) {
          // Get watermark from MergeRollupTaskMetadata ZNode
          // bucketStartMs = watermarkMs
          // bucketEndMs = bucketStartMs + bucketMs
          watermarkMs = getWatermarkMs(preSelectedSegStartTimeMs, bucketMs, mergeLevel,
              mergeRollupTaskMetadata);
          bucketStartMs = watermarkMs;
        }
        long bucketEndMs = bucketStartMs + bucketMs;
        if (lowerMergeLevel == null) {
          long lowestLevelMaxValidBucketEndTimeMs = Long.MIN_VALUE;
          for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
            // Compute lowestLevelMaxValidBucketEndTimeMs among segments that are ready for merge
            long currentValidBucketEndTimeMs =
                getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
            lowestLevelMaxValidBucketEndTimeMs =
                Math.max(lowestLevelMaxValidBucketEndTimeMs, currentValidBucketEndTimeMs);
          }
          _tableLowestLevelMaxValidBucketEndTimeMs.put(tableNameWithType, lowestLevelMaxValidBucketEndTimeMs);
        }
        // Create metrics even if there's no task scheduled, this helps the case that the controller is restarted
        // but the metrics are not available until the controller schedules a valid task
        List<String> sortedMergeLevels =
            sortedMergeLevelConfigs.stream().map(e -> e.getKey()).collect(Collectors.toList());
        if (processAll) {
          createOrUpdateNumBucketsToProcessMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, bufferMs, bucketMs,
              preSelectedSegments, sortedMergeLevels);
        } else {
          createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
        }

        if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata, processAll)) {
          LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}, mode : {}) cannot be merged yet",
              bucketStartMs, bucketEndMs, tableNameWithType, mergeLevel, processAll ? MergeTask.PROCESS_ALL_MODE
                  : MergeTask.PROCESS_FROM_WATERMARK_MODE);
          continue;
        }

        // Find overlapping segments for each bucket, skip the buckets that has all segments merged
        List<List<SegmentZKMetadata>> selectedSegmentsForAllBuckets = new ArrayList<>(maxNumParallelBuckets);
        List<SegmentZKMetadata> selectedSegmentsForBucket = new ArrayList<>();
        boolean hasUnmergedSegments = false;
        boolean hasSpilledOverData = false;
        boolean areAllSegmentsReadyToMerge = true;

        // The for loop terminates in following cases:
        // 1. Found buckets with unmerged segments:
        //    For each bucket find all segments overlapping with the target bucket, skip the bucket if all overlapping
        //    segments are merged. Schedule k (numParallelBuckets) buckets at most, and stops at the first bucket that
        //    contains spilled over data.
        //    One may wonder how a segment with records spanning different buckets is handled. The short answer is that
        //    it will be cut into multiple segments, each for a separate bucket. This is achieved by setting bucket time
        //    period as PARTITION_BUCKET_TIME_PERIOD when generating PinotTaskConfigs
        // 2. There's no bucket with unmerged segments, skip scheduling
        for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
          long startTimeMs = preSelectedSegment.getStartTimeMs();
          if (startTimeMs < bucketEndMs) {
            long endTimeMs = preSelectedSegment.getEndTimeMs();
            if (endTimeMs >= bucketStartMs) {
              // For segments overlapping with current bucket, add to the result list
              if (!isMergedSegment(preSelectedSegment, mergeLevel, sortedMergeLevels)) {
                hasUnmergedSegments = true;
              }
              if (!isMergedSegment(preSelectedSegment, lowerMergeLevel, sortedMergeLevels)) {
                areAllSegmentsReadyToMerge = false;
              }
              if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
                hasSpilledOverData = true;
              }
              selectedSegmentsForBucket.add(preSelectedSegment);
            }
            // endTimeMs < bucketStartMs
            // Haven't find the first overlapping segment, continue to the next segment
          } else {
            // Has gone through all overlapping segments for current bucket
            if (hasUnmergedSegments && areAllSegmentsReadyToMerge) {
              // Add the bucket if there are unmerged segments
              selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
            }

            if (selectedSegmentsForAllBuckets.size() == maxNumParallelBuckets || hasSpilledOverData) {
              // If there are enough buckets or found spilled over data, schedule merge tasks
              break;
            } else {
              // Start with a new bucket
              // TODO: If there are many small merged segments, we should merge them again
              selectedSegmentsForBucket = new ArrayList<>();
              hasUnmergedSegments = false;
              areAllSegmentsReadyToMerge = true;
              bucketStartMs = (startTimeMs / bucketMs) * bucketMs;
              bucketEndMs = bucketStartMs + bucketMs;
              if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata, processAll)) {
                break;
              }
              if (!isMergedSegment(preSelectedSegment, mergeLevel, sortedMergeLevels)) {
                hasUnmergedSegments = true;
              }
              if (!isMergedSegment(preSelectedSegment, lowerMergeLevel, sortedMergeLevels)) {
                areAllSegmentsReadyToMerge = false;
              }
              if (hasSpilledOverData(preSelectedSegment, bucketMs)) {
                hasSpilledOverData = true;
              }
              selectedSegmentsForBucket.add(preSelectedSegment);
            }
          }
        }

        // Add the last bucket if it contains unmerged segments and is not added before
        if (hasUnmergedSegments && areAllSegmentsReadyToMerge && (selectedSegmentsForAllBuckets.isEmpty() || (
            selectedSegmentsForAllBuckets.get(selectedSegmentsForAllBuckets.size() - 1)
                != selectedSegmentsForBucket))) {
          selectedSegmentsForAllBuckets.add(selectedSegmentsForBucket);
        }

        if (selectedSegmentsForAllBuckets.isEmpty()) {
          LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableNameWithType, mergeLevel);
          continue;
        }

        // Bump up watermark to the earliest start time of selected segments truncated to the closest bucket boundary
        long newWatermarkMs = selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs * bucketMs;
        mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
        LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", tableNameWithType, mergeLevel,
            watermarkMs, newWatermarkMs);

        // Update the delay metrics
        if (!processAll) {
          createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs,
              bucketMs);
        }

        // Create task configs
        int maxNumRecordsPerTask =
            mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null ? Integer.parseInt(
                mergeConfigs.get(MergeRollupTask.MAX_NUM_RECORDS_PER_TASK_KEY)) : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
        SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
        if (segmentPartitionConfig == null) {
          for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
            pinotTaskConfigsForTable.addAll(
                createPinotTaskConfigs(selectedSegmentsPerBucket, tableConfig, maxNumRecordsPerTask, mergeLevel,
                    null, mergeConfigs, taskConfigs));
          }
        } else {
          // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from
          // partitions of all partition columns. There should be exact match between partition columns of segment and
          // partition columns of table configuration, and there is only partition per column in segment metadata).
          // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks
          // are generated for them.
          Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
          List<String> partitionColumns = new ArrayList<>(columnPartitionMap.keySet());
          for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
            Map<List<Integer>, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
            List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
            for (SegmentZKMetadata selectedSegment : selectedSegmentsPerBucket) {
              SegmentPartitionMetadata segmentPartitionMetadata = selectedSegment.getPartitionMetadata();
              List<Integer> partitions = new ArrayList<>();
              if (segmentPartitionMetadata != null && columnPartitionMap.keySet()
                  .equals(segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
                for (String partitionColumn : partitionColumns) {
                  if (segmentPartitionMetadata.getPartitions(partitionColumn).size() == 1) {
                    partitions.add(segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next());
                  } else {
                    partitions.clear();
                    break;
                  }
                }
              }
              if (partitions.isEmpty()) {
                outlierSegments.add(selectedSegment);
              } else {
                partitionToSegments.computeIfAbsent(partitions, k -> new ArrayList<>()).add(selectedSegment);
              }
            }

            for (Map.Entry<List<Integer>, List<SegmentZKMetadata>> entry : partitionToSegments.entrySet()) {
              List<Integer> partition = entry.getKey();
              List<SegmentZKMetadata> partitionedSegments = entry.getValue();
              pinotTaskConfigsForTable.addAll(
                  createPinotTaskConfigs(partitionedSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
                      partition, mergeConfigs, taskConfigs));
            }

            if (!outlierSegments.isEmpty()) {
              pinotTaskConfigsForTable.addAll(
                  createPinotTaskConfigs(outlierSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
                      null, mergeConfigs, taskConfigs));
            }
          }
        }
      }

      // Write updated watermark map to zookeeper
      if (!processAll) {
        try {
          _clusterInfoAccessor
              .setMinionTaskMetadata(mergeRollupTaskMetadata, MinionConstants.MergeRollupTask.TASK_TYPE,
                  expectedVersion);
        } catch (ZkException e) {
          LOGGER.error(
              "Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are "
                  + "multiple task schedulers for the same table, need to investigate!", tableNameWithType);
          continue;
        }
      }
      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
      LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", tableNameWithType,
          taskType, pinotTaskConfigsForTable.size());
    }

    // Clean up metrics
    cleanUpDelayMetrics(tableConfigs);

    return pinotTaskConfigs;
  }