public List executeTask()

in pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java [179:353]


  public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
      throws Exception {
    preProcess(pinotTaskConfig);
    _pinotTaskConfig = pinotTaskConfig;
    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
    String taskType = pinotTaskConfig.getTaskType();
    Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
    String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY);
    String inputSegmentNames = taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY);
    String[] segmentNames = inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
    String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
    String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
    String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
    LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
        tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
    File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
    Preconditions.checkState(tempDataDir.mkdirs());
    try {
      List<File> inputSegmentDirs = new ArrayList<>();
      int numRecords = 0;

      for (int i = 0; i < downloadURLs.length; i++) {
        String segmentName = segmentNames[i];
        // Download and decompress the segment file
        _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + downloadURLs[i]
            + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
        File indexDir;
        try {
          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType,
              tempDataDir, "_" + i);
        } catch (Exception e) {
          LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e);
          _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
          _eventObserver.notifyTaskError(_pinotTaskConfig, e);
          throw e;
        }
        inputSegmentDirs.add(indexDir);

        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
        SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
        numRecords += segmentMetadata.getTotalDocs();
      }

      // Convert the segments
      File workingDir = new File(tempDataDir, "workingDir");
      Preconditions.checkState(workingDir.mkdir());
      List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentDirs, workingDir);

      reportTaskProcessingMetrics(tableNameWithType, taskType, numRecords);

      // Create a directory for converted tarred segment files
      File convertedTarredSegmentDir = new File(tempDataDir, "convertedTarredSegmentDir");
      Preconditions.checkState(convertedTarredSegmentDir.mkdir());

      int numOutputSegments = segmentConversionResults.size();
      List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
      int count = 1;
      for (SegmentConversionResult segmentConversionResult : segmentConversionResults) {
        File convertedSegmentDir = segmentConversionResult.getFile();
        reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType, taskType);

        // Tar the converted segment
        _eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: "
            + segmentConversionResult.getSegmentName() + " (" + (count++) + " out of " + numOutputSegments + ")");
        File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
            segmentConversionResult.getSegmentName() + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
        TarCompressionUtils.createCompressedTarFile(convertedSegmentDir, convertedSegmentTarFile);
        tarredSegmentFiles.add(convertedSegmentTarFile);
        if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
          LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
        }
      }

      // Delete the input segment after tarring the converted segment to avoid deleting the converted segment when the
      // conversion happens in-place (converted segment dir is the same as input segment dir). It could also happen when
      // the conversion is not required, and the input segment dir is returned as the result.
      for (File inputSegmentDir : inputSegmentDirs) {
        if (inputSegmentDir.exists() && !FileUtils.deleteQuietly(inputSegmentDir)) {
          LOGGER.warn("Failed to delete input segment: {}", inputSegmentDir.getAbsolutePath());
        }
      }

      // Check whether the task get cancelled before uploading the segment
      if (_cancelled) {
        LOGGER.info("{} on table: {}, segments: {} got cancelled", taskType, tableNameWithType, inputSegmentNames);
        throw new TaskCancelledException(
            taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
      }

      SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
      preUploadSegments(segmentUploadContext);
      Map<String, String> segmentUriToTarPathMap = new HashMap<>();
      PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
      boolean batchSegmentUpload = pushJobSpec.isBatchSegmentUpload();

      // Upload the tarred segments
      for (int i = 0; i < numOutputSegments; i++) {
        File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
        SegmentConversionResult segmentConversionResult = segmentConversionResults.get(i);
        String resultSegmentName = segmentConversionResult.getSegmentName();
        _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + resultSegmentName + " (" + (i + 1)
            + " out of " + numOutputSegments + ")");
        String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
            BatchConfigProperties.SegmentPushType.TAR.name());
        URI outputSegmentTarURI;
        if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
            != BatchConfigProperties.SegmentPushType.TAR) {
          outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile);
          LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
              outputSegmentTarURI);
        } else {
          outputSegmentTarURI = convertedTarredSegmentFile.toURI();
        }

        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
        List<Header> httpHeaders = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults);
        // Set parameters for upload request
        List<NameValuePair> parameters = getSegmentPushCommonParams(tableNameWithType);

        // RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table
        // TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper
        //  way to override it.
        if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
          Iterator<NameValuePair> paramItr = parameters.iterator();
          while (paramItr.hasNext()) {
            NameValuePair nameValuePair = paramItr.next();
            if (FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName())) {
              paramItr.remove();
              break;
            }
          }
          parameters.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
              TableType.OFFLINE.toString()));
        }

        if (batchSegmentUpload) {
          updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult,
              segmentUriToTarPathMap, pushJobSpec);
        } else {
          String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
          pushSegment(rawTableName, taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult);
          if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
            LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
          }
        }
      }

      if (batchSegmentUpload) {
        try {
          pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, segmentUriToTarPathMap, pushJobSpec,
              authProvider, segmentConversionResults);
        } finally {
          for (File convertedTarredSegmentFile: tarredSegmentFiles) {
            if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
              LOGGER.warn("Failed to delete converted tarred segment file: {}",
                  convertedTarredSegmentFile.getAbsolutePath());
            }
          }
        }
      }

      postUploadSegments(segmentUploadContext);

      String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
          .collect(Collectors.joining(","));
      postProcess(pinotTaskConfig);
      LOGGER.info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType,
          tableNameWithType, inputSegmentNames, outputSegmentNames);

      return segmentConversionResults;
    } finally {
      FileUtils.deleteQuietly(tempDataDir);
    }
  }