in indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java [814:1017]
private TaskStatus generateAndPublishSegments(
final TaskToolbox toolbox,
final DataSchema dataSchema,
final InputSource inputSource,
final File tmpDir,
final PartitionAnalysis partitionAnalysis
) throws IOException, InterruptedException
{
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics();
final TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(this, buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters);
toolbox.addMonitor(metricsMonitor);
final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final long pushTimeout = tuningConfig.getPushTimeout();
final SegmentAllocatorForBatch segmentAllocator;
final SequenceNameFunction sequenceNameFunction;
switch (partitionsSpec.getType()) {
case HASH:
case RANGE:
final SegmentAllocatorForBatch localSegmentAllocator = SegmentAllocators.forNonLinearPartitioning(
toolbox,
getDataSource(),
baseSequenceName,
dataSchema.getGranularitySpec(),
null,
(CompletePartitionAnalysis) partitionAnalysis
);
sequenceNameFunction = localSegmentAllocator.getSequenceNameFunction();
segmentAllocator = localSegmentAllocator;
break;
case LINEAR:
segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox,
baseSequenceName,
null,
dataSchema,
getTaskLockHelper(),
getIngestionMode(),
partitionAnalysis.getPartitionsSpec(),
null
);
sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
break;
default:
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
}
final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox);
String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
effectiveId = getId();
}
final Appenderator appenderator = BatchAppenderators.newAppenderator(
effectiveId,
toolbox.getAppenderatorsManager(),
buildSegmentsSegmentGenerationMetrics,
toolbox,
dataSchema,
tuningConfig,
buildSegmentsMeters,
buildSegmentsParseExceptionHandler
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
driver.startJob();
Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> commitMetadataAndSchema = InputSourceProcessor.process(
dataSchema,
driver,
partitionsSpec,
inputSource,
inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
tmpDir,
sequenceNameFunction,
new DefaultIndexTaskInputRowIteratorBuilder(),
buildSegmentsMeters,
buildSegmentsParseExceptionHandler,
pushTimeout
);
SegmentsAndCommitMetadata pushed = commitMetadataAndSchema.lhs;
// If we use timeChunk lock, then we don't have to specify what segments will be overwritten because
// it will just overwrite all segments overlapped with the new segments.
final Set<DataSegment> inputSegments = getTaskLockHelper().isUseSegmentLock()
? getTaskLockHelper().getLockedExistingSegments()
: null;
final boolean storeCompactionState = getContextValue(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
);
Set<DataSegment> tombStones = Collections.emptySet();
if (getIngestionMode() == IngestionMode.REPLACE) {
// check whether to generate tombstones...
TombstoneHelper tombstoneHelper = new TombstoneHelper(toolbox.getTaskActionClient());
List<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervals(
pushed.getSegments(),
ingestionSchema.getDataSchema()
);
// now find the versions for the tombstone intervals
Map<Interval, SegmentIdWithShardSpec> tombstonesAndVersions = new HashMap<>();
for (Interval interval : tombstoneIntervals) {
SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
ingestionSchema,
interval.getStart()
);
tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
}
tombStones = tombstoneHelper.computeTombstones(ingestionSchema.getDataSchema(), tombstonesAndVersions);
log.debugSegments(tombStones, "To publish tombstones");
}
// Probably we can publish atomicUpdateGroup along with segments.
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(
inputSegments,
tombStones,
publisher,
annotateFunction,
commitMetadataAndSchema.rhs
), pushTimeout);
appenderator.close();
// Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
// for awaitSegmentAvailabilityTimeoutMillis
if (tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() > 0 && published != null) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>(published.getSegments());
waitForSegmentAvailability(
toolbox,
segmentsToWaitFor,
tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()
);
}
ingestionState = IngestionState.COMPLETED;
if (published == null) {
log.error("Failed to publish segments, aborting!");
errorMsg = "Failed to publish segments.";
updateAndWriteCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
);
} else {
log.info(
"Processed[%,d] events, unparseable[%,d], thrownAway[%,d].",
buildSegmentsMeters.getProcessed(),
buildSegmentsMeters.getUnparseable(),
buildSegmentsMeters.getThrownAway()
);
log.info("Published [%s] segments", published.getSegments().size());
// publish metrics:
emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", tombStones.size());
// segments count metric is documented to include tombstones
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
published.getSegments().size() + tombStones.size()
);
log.debugSegments(published.getSegments(), "Published segments");
updateAndWriteCompletionReports(
toolbox,
// only applicable to the compaction use cases
inputSource instanceof DruidInputSource
? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead()
: null,
(long) published.getSegments().size()
);
return TaskStatus.success(getId());
}
}
catch (TimeoutException | ExecutionException e) {
exceptionOccurred = true;
throw new RuntimeException(e);
}
catch (Exception e) {
exceptionOccurred = true;
throw e;
}
finally {
if (exceptionOccurred) {
appenderator.closeNow();
} else {
appenderator.close();
}
toolbox.removeMonitor(metricsMonitor);
}
}