in pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java [179:353]
public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
throws Exception {
preProcess(pinotTaskConfig);
_pinotTaskConfig = pinotTaskConfig;
_eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY);
String inputSegmentNames = taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY);
String[] segmentNames = inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
try {
List<File> inputSegmentDirs = new ArrayList<>();
int numRecords = 0;
for (int i = 0; i < downloadURLs.length; i++) {
String segmentName = segmentNames[i];
// Download and decompress the segment file
_eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + downloadURLs[i]
+ " (" + (i + 1) + " out of " + downloadURLs.length + ")");
File indexDir;
try {
indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType,
tempDataDir, "_" + i);
} catch (Exception e) {
LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e);
_minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
_eventObserver.notifyTaskError(_pinotTaskConfig, e);
throw e;
}
inputSegmentDirs.add(indexDir);
reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
numRecords += segmentMetadata.getTotalDocs();
}
// Convert the segments
File workingDir = new File(tempDataDir, "workingDir");
Preconditions.checkState(workingDir.mkdir());
List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentDirs, workingDir);
reportTaskProcessingMetrics(tableNameWithType, taskType, numRecords);
// Create a directory for converted tarred segment files
File convertedTarredSegmentDir = new File(tempDataDir, "convertedTarredSegmentDir");
Preconditions.checkState(convertedTarredSegmentDir.mkdir());
int numOutputSegments = segmentConversionResults.size();
List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
int count = 1;
for (SegmentConversionResult segmentConversionResult : segmentConversionResults) {
File convertedSegmentDir = segmentConversionResult.getFile();
reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType, taskType);
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: "
+ segmentConversionResult.getSegmentName() + " (" + (count++) + " out of " + numOutputSegments + ")");
File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
segmentConversionResult.getSegmentName() + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
TarCompressionUtils.createCompressedTarFile(convertedSegmentDir, convertedSegmentTarFile);
tarredSegmentFiles.add(convertedSegmentTarFile);
if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
}
}
// Delete the input segment after tarring the converted segment to avoid deleting the converted segment when the
// conversion happens in-place (converted segment dir is the same as input segment dir). It could also happen when
// the conversion is not required, and the input segment dir is returned as the result.
for (File inputSegmentDir : inputSegmentDirs) {
if (inputSegmentDir.exists() && !FileUtils.deleteQuietly(inputSegmentDir)) {
LOGGER.warn("Failed to delete input segment: {}", inputSegmentDir.getAbsolutePath());
}
}
// Check whether the task get cancelled before uploading the segment
if (_cancelled) {
LOGGER.info("{} on table: {}, segments: {} got cancelled", taskType, tableNameWithType, inputSegmentNames);
throw new TaskCancelledException(
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
preUploadSegments(segmentUploadContext);
Map<String, String> segmentUriToTarPathMap = new HashMap<>();
PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
boolean batchSegmentUpload = pushJobSpec.isBatchSegmentUpload();
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
SegmentConversionResult segmentConversionResult = segmentConversionResults.get(i);
String resultSegmentName = segmentConversionResult.getSegmentName();
_eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + resultSegmentName + " (" + (i + 1)
+ " out of " + numOutputSegments + ")");
String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
URI outputSegmentTarURI;
if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
!= BatchConfigProperties.SegmentPushType.TAR) {
outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
outputSegmentTarURI);
} else {
outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}
// Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
List<Header> httpHeaders = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults);
// Set parameters for upload request
List<NameValuePair> parameters = getSegmentPushCommonParams(tableNameWithType);
// RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table
// TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper
// way to override it.
if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
Iterator<NameValuePair> paramItr = parameters.iterator();
while (paramItr.hasNext()) {
NameValuePair nameValuePair = paramItr.next();
if (FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName())) {
paramItr.remove();
break;
}
}
parameters.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
TableType.OFFLINE.toString()));
}
if (batchSegmentUpload) {
updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult,
segmentUriToTarPathMap, pushJobSpec);
} else {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
pushSegment(rawTableName, taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
}
}
}
if (batchSegmentUpload) {
try {
pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, segmentUriToTarPathMap, pushJobSpec,
authProvider, segmentConversionResults);
} finally {
for (File convertedTarredSegmentFile: tarredSegmentFiles) {
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete converted tarred segment file: {}",
convertedTarredSegmentFile.getAbsolutePath());
}
}
}
}
postUploadSegments(segmentUploadContext);
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.joining(","));
postProcess(pinotTaskConfig);
LOGGER.info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType,
tableNameWithType, inputSegmentNames, outputSegmentNames);
return segmentConversionResults;
} finally {
FileUtils.deleteQuietly(tempDataDir);
}
}