public void run()

in pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java [122:331]


  public void run()
      throws Exception {
    //init all file systems
    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
      PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
    }
    //Get list of files to process
    URI inputDirURI = new URI(_spec.getInputDirURI());
    if (inputDirURI.getScheme() == null) {
      inputDirURI = new File(_spec.getInputDirURI()).toURI();
    }
    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
    List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
        _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
    LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
    //Get outputFS for writing output pinot segments
    URI outputDirURI = new URI(_spec.getOutputDirURI());
    if (outputDirURI.getScheme() == null) {
      outputDirURI = new File(_spec.getOutputDirURI()).toURI();
    }
    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
    outputDirFS.mkdir(outputDirURI);

    //Get staging directory for temporary output pinot segments
    String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
    URI stagingDirURI = null;
    if (stagingDir != null) {
      stagingDirURI = URI.create(stagingDir);
      if (stagingDirURI.getScheme() == null) {
        stagingDirURI = new File(stagingDir).toURI();
      }
      if (!outputDirURI.getScheme().equals(stagingDirURI.getScheme())) {
        throw new RuntimeException(String
            .format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.",
                stagingDirURI, outputDirURI));
      }
      outputDirFS.mkdir(stagingDirURI);
    }
    try {
      JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());

      // Pinot plugins are necessary to launch Pinot ingestion job from every mapper.
      // In order to ensure pinot plugins would be loaded to each worker, this method
      // tars entire plugins directory and set this file into Distributed cache.
      // Then each executor job will untar the plugin tarball, and set system properties accordingly.
      packPluginsToDistributedCache(sparkContext);

      // Add dependency jars
      if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
        addDepsJarToDistributedCache(sparkContext,
            _spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
      }

      List<String> pathAndIdxList = new ArrayList<>();
      if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
        Map<String, List<String>> localDirIndex = new HashMap<>();
        for (String filteredFile : filteredFiles) {
          Path filteredParentPath = Paths.get(filteredFile).getParent();
          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
          }
          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
        }
        for (String parentPath : localDirIndex.keySet()) {
          List<String> siblingFiles = localDirIndex.get(parentPath);
          Collections.sort(siblingFiles);
          for (int i = 0; i < siblingFiles.size(); i++) {
            pathAndIdxList.add(siblingFiles.get(i) + " " + i);
          }
        }
      } else {
        for (int i = 0; i < filteredFiles.size(); i++) {
          pathAndIdxList.add(filteredFiles.get(i) + " " + i);
        }
      }
      int numDataFiles = pathAndIdxList.size();
      int jobParallelism = _spec.getSegmentCreationJobParallelism();
      if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
        jobParallelism = numDataFiles;
      }

      JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, jobParallelism);

      final String pluginsInclude =
          (sparkContext.getConf().contains(PLUGINS_INCLUDE_PROPERTY_NAME)) ? sparkContext.getConf()
              .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
      final URI finalInputDirURI = inputDirURI;
      final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
      // instead.
      pathRDD.foreach(new VoidFunction<String>() {
        @Override
        public void call(String pathAndIdx)
            throws Exception {
          PluginManager.get().init();
          for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
            PinotFSFactory
                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
          }
          PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
          String[] splits = pathAndIdx.split(" ");
          String path = splits[0];
          int idx = Integer.valueOf(splits[1]);
          // Load Pinot Plugins copied from Distributed cache.
          File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
          if (localPluginsTarFile.exists()) {
            File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
            try {
              TarCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
            } catch (Exception e) {
              LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
              throw new RuntimeException(e);
            }
            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
                pluginsDirFile.getAbsolutePath());
            System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
            if (pluginsInclude != null) {
              LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
              System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
            }
            LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
                System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
          } else {
            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
          }
          URI inputFileURI = URI.create(path);
          if (inputFileURI.getScheme() == null) {
            inputFileURI =
                new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
          }

          //create localTempDir for input and output
          File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
          File localInputTempDir = new File(localTempDir, "input");
          FileUtils.forceMkdir(localInputTempDir);
          File localOutputTempDir = new File(localTempDir, "output");
          FileUtils.forceMkdir(localOutputTempDir);

          //copy input path to local
          File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
          LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
          PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);

          //create task spec
          SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
          taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
          taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
          taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
          taskSpec
              .setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
          taskSpec.setTableConfig(
              SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
          taskSpec.setSequenceId(idx);
          taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
          taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment());
          taskSpec.setCreateMetadataTarGz(_spec.isCreateMetadataTarGz());
          taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());

          SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
          String segmentName = taskRunner.run();

          // Tar segment directory to compress file
          File localSegmentDir = new File(localOutputTempDir, segmentName);
          String segmentTarFileName = URIUtils.encode(segmentName + Constants.TAR_GZ_FILE_EXT);
          File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
          LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
          TarCompressionUtils.createCompressedTarFile(localSegmentDir, localSegmentTarFile);
          long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
          long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
          LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
              DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
          // Move segment to output PinotFS
          URI relativeOutputPath =
              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI);
          URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName);
          SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, outputSegmentTarURI,
              _spec.isOverwriteOutput());

          // Create and upload segment metadata tar file
          String metadataTarFileName = URIUtils.encode(segmentName + Constants.METADATA_TAR_GZ_FILE_EXT);
          URI outputMetadataTarURI = relativeOutputPath.resolve(metadataTarFileName);

          if (finalOutputDirFS.exists(outputMetadataTarURI) && (_spec.isOverwriteOutput()
              || !_spec.isCreateMetadataTarGz())) {
            LOGGER.info("Deleting existing metadata tar gz file: {}", outputMetadataTarURI);
            finalOutputDirFS.delete(outputMetadataTarURI, true);
          }
          if (taskSpec.isCreateMetadataTarGz()) {
            File localMetadataTarFile = new File(localOutputTempDir, metadataTarFileName);
            SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, localMetadataTarFile);
            SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, outputMetadataTarURI,
                _spec.isOverwriteOutput());
          }
          FileUtils.deleteQuietly(localSegmentDir);
          FileUtils.deleteQuietly(localInputDataFile);
        }
      });
      if (stagingDirURI != null) {
        LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
                outputDirURI);
        SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true);
      }
    } finally {
      if (stagingDirURI != null) {
        LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
        outputDirFS.delete(stagingDirURI, true);
      }
    }
  }