in indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java [337:530]
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
boolean indexGeneratorJobAttempted = false;
boolean indexGeneratorJobSuccess = false;
HadoopIngestionSpec indexerSchema = null;
try {
registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
logExtensionsConfig();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
jsonMapper,
new OverlordActionBasedUsedSegmentsRetriever(toolbox)
);
Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
loader
);
determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
String[] determinePartitionsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(),
hadoopJobIdFile
};
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
"runTask",
determinePartitionsInput.getClass()
);
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.DETERMINE_PARTITIONS;
final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
determinePartitionsInnerProcessingRunner,
new Object[]{determinePartitionsInput}
);
determineConfigStatus = toolbox
.getJsonMapper()
.readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
indexerSchema = determineConfigStatus.getSchema();
if (indexerSchema == null) {
errorMsg = determineConfigStatus.getErrorMsg();
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
// We should have a lock from before we started running only if interval was specified
String version;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]", interval
);
lock.assertNotRevoked();
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}
final String specVersion = indexerSchema.getTuningConfig().getVersion();
if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
if (specVersion.compareTo(version) < 0) {
version = specVersion;
} else {
String errMsg =
StringUtils.format(
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
specVersion,
version
);
log.error(errMsg);
toolbox.getTaskReportFileWriter().write(getId(), null);
return TaskStatus.failure(getId(), errMsg);
}
}
log.info("Setting version to: %s", version);
Object innerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
loader
);
buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
String[] buildSegmentsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
version,
hadoopJobIdFile
};
Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.BUILD_SEGMENTS;
indexGeneratorJobAttempted = true;
final String jobStatusString = (String) innerProcessingRunTask.invoke(
innerProcessingRunner,
new Object[]{buildSegmentsInput}
);
buildSegmentsStatus = toolbox.getJsonMapper().readValue(
jobStatusString,
HadoopIndexGeneratorInnerProcessingStatus.class
);
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
if (dataSegmentAndIndexZipFilePaths != null) {
indexGeneratorJobSuccess = true;
renameSegmentIndexFilesJob(
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
);
ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
.map(
DataSegmentAndIndexZipFilePath::getSegment)
.collect(Collectors.toList()));
toolbox.publishSegments(segments);
// Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
// for awaitSegmentAvailabilityTimeoutMillis
if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
waitForSegmentAvailability(
toolbox,
segments,
spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
);
}
ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.success(getId());
} else {
errorMsg = buildSegmentsStatus.getErrorMsg();
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
finally {
indexerGeneratorCleanupJob(
indexGeneratorJobAttempted,
indexGeneratorJobSuccess,
indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema)
);
}
}