in pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java [124:338]
public void run()
throws Exception {
//init all file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
TableConfig tableConfig =
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken());
boolean consistentPushEnabled = ConsistentDataPushUtils.consistentDataPushEnabled(tableConfig);
if (consistentPushEnabled) {
ConsistentDataPushUtils.configureSegmentPostfix(_spec);
}
//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
outputDirURI = new File(_spec.getOutputDirURI()).toURI();
}
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
outputDirFS.mkdir(outputDirURI);
//Get staging directory for temporary output pinot segments
String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
URI stagingDirURI = null;
if (stagingDir != null) {
stagingDirURI = URI.create(stagingDir);
if (stagingDirURI.getScheme() == null) {
stagingDirURI = new File(stagingDir).toURI();
}
if (!outputDirURI.getScheme().equals(stagingDirURI.getScheme())) {
throw new RuntimeException("The scheme of staging directory URI [" + stagingDirURI + "] and output directory "
+ "URI [" + outputDirURI + "] has to be same.");
}
outputDirFS.mkdir(stagingDirURI);
}
try {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
// Pinot plugins are necessary to launch Pinot ingestion job from every mapper.
// In order to ensure pinot plugins would be loaded to each worker, this method
// tars entire plugins directory and set this file into Distributed cache.
// Then each executor job will untar the plugin tarball, and set system properties accordingly.
packPluginsToDistributedCache(sparkContext);
// Add dependency jars
if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
addDepsJarToDistributedCache(sparkContext,
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
}
List<String> pathAndIdxList = new ArrayList<>();
if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
Map<String, List<String>> localDirIndex = new HashMap<>();
for (String filteredFile : filteredFiles) {
Path filteredParentPath = Paths.get(filteredFile).getParent();
if (!localDirIndex.containsKey(filteredParentPath.toString())) {
localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
}
localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
}
for (String parentPath : localDirIndex.keySet()) {
List<String> siblingFiles = localDirIndex.get(parentPath);
Collections.sort(siblingFiles);
for (int i = 0; i < siblingFiles.size(); i++) {
pathAndIdxList.add(siblingFiles.get(i) + " " + i);
}
}
} else {
for (int i = 0; i < filteredFiles.size(); i++) {
pathAndIdxList.add(filteredFiles.get(i) + " " + i);
}
}
int numDataFiles = pathAndIdxList.size();
int jobParallelism = _spec.getSegmentCreationJobParallelism();
if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
jobParallelism = numDataFiles;
}
JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, jobParallelism);
final String pluginsInclude =
(sparkContext.getConf().contains(PLUGINS_INCLUDE_PROPERTY_NAME)) ? sparkContext.getConf()
.get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
final URI finalInputDirURI = inputDirURI;
final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String pathAndIdx)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
}
PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
String[] splits = pathAndIdx.split(" ");
String path = splits[0];
int idx = Integer.valueOf(splits[1]);
// Load Pinot Plugins copied from Distributed cache.
File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
if (localPluginsTarFile.exists()) {
File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
try {
TarCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
} catch (Exception e) {
LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
throw new RuntimeException(e);
}
LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
pluginsDirFile.getAbsolutePath());
System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
if (pluginsInclude != null) {
LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
}
LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
} else {
LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
}
URI inputFileURI = URI.create(path);
if (inputFileURI.getScheme() == null) {
inputFileURI =
new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
}
//create localTempDir for input and output
File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
File localInputTempDir = new File(localTempDir, "input");
FileUtils.forceMkdir(localInputTempDir);
File localOutputTempDir = new File(localTempDir, "output");
FileUtils.forceMkdir(localOutputTempDir);
//copy input path to local
File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);
//create task spec
SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
taskSpec.setSchema(
SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()));
taskSpec.setTableConfig(
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()));
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment());
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
String segmentName = taskRunner.run();
// Tar segment directory to compress file
File localSegmentDir = new File(localOutputTempDir, segmentName);
String segmentTarFileName = URIUtils.encode(segmentName + Constants.TAR_GZ_FILE_EXT);
File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
TarCompressionUtils.createCompressedTarFile(localSegmentDir, localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
// Move segment to output PinotFS
URI relativeOutputPath =
SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI);
URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName);
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, outputSegmentTarURI,
_spec.isOverwriteOutput());
// Create and upload segment metadata tar file
String metadataTarFileName = URIUtils.encode(segmentName + Constants.METADATA_TAR_GZ_FILE_EXT);
URI outputMetadataTarURI = relativeOutputPath.resolve(metadataTarFileName);
if (finalOutputDirFS.exists(outputMetadataTarURI) && (_spec.isOverwriteOutput()
|| !_spec.isCreateMetadataTarGz())) {
LOGGER.info("Deleting existing metadata tar gz file: {}", outputMetadataTarURI);
finalOutputDirFS.delete(outputMetadataTarURI, true);
}
if (taskSpec.isCreateMetadataTarGz()) {
File localMetadataTarFile = new File(localOutputTempDir, metadataTarFileName);
SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, localMetadataTarFile);
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, outputMetadataTarURI,
_spec.isOverwriteOutput());
}
FileUtils.deleteQuietly(localSegmentDir);
FileUtils.deleteQuietly(localInputDataFile);
}
});
if (stagingDirURI != null) {
LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true);
}
} finally {
if (stagingDirURI != null) {
LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
outputDirFS.delete(stagingDirURI, true);
}
}
}