private TaskStatus generateAndPublishSegments()

in indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java [814:1017]


  private TaskStatus generateAndPublishSegments(
      final TaskToolbox toolbox,
      final DataSchema dataSchema,
      final InputSource inputSource,
      final File tmpDir,
      final PartitionAnalysis partitionAnalysis
  ) throws IOException, InterruptedException
  {
    final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics();
    final TaskRealtimeMetricsMonitor metricsMonitor =
        TaskRealtimeMetricsMonitorBuilder.build(this, buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters);
    toolbox.addMonitor(metricsMonitor);

    final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();
    final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
    final long pushTimeout = tuningConfig.getPushTimeout();

    final SegmentAllocatorForBatch segmentAllocator;
    final SequenceNameFunction sequenceNameFunction;
    switch (partitionsSpec.getType()) {
      case HASH:
      case RANGE:
        final SegmentAllocatorForBatch localSegmentAllocator = SegmentAllocators.forNonLinearPartitioning(
            toolbox,
            getDataSource(),
            baseSequenceName,
            dataSchema.getGranularitySpec(),
            null,
            (CompletePartitionAnalysis) partitionAnalysis
        );
        sequenceNameFunction = localSegmentAllocator.getSequenceNameFunction();
        segmentAllocator = localSegmentAllocator;
        break;
      case LINEAR:
        segmentAllocator = SegmentAllocators.forLinearPartitioning(
            toolbox,
            baseSequenceName,
            null,
            dataSchema,
            getTaskLockHelper(),
            getIngestionMode(),
            partitionAnalysis.getPartitionsSpec(),
            null
        );
        sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
        break;
      default:
        throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
    }

    final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox);

    String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
    if (effectiveId == null) {
      effectiveId = getId();
    }

    final Appenderator appenderator = BatchAppenderators.newAppenderator(
        effectiveId,
        toolbox.getAppenderatorsManager(),
        buildSegmentsSegmentGenerationMetrics,
        toolbox,
        dataSchema,
        tuningConfig,
        buildSegmentsMeters,
        buildSegmentsParseExceptionHandler
    );
    boolean exceptionOccurred = false;
    try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
      driver.startJob();

      Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> commitMetadataAndSchema = InputSourceProcessor.process(
          dataSchema,
          driver,
          partitionsSpec,
          inputSource,
          inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
          tmpDir,
          sequenceNameFunction,
          new DefaultIndexTaskInputRowIteratorBuilder(),
          buildSegmentsMeters,
          buildSegmentsParseExceptionHandler,
          pushTimeout
      );

      SegmentsAndCommitMetadata pushed = commitMetadataAndSchema.lhs;
      // If we use timeChunk lock, then we don't have to specify what segments will be overwritten because
      // it will just overwrite all segments overlapped with the new segments.
      final Set<DataSegment> inputSegments = getTaskLockHelper().isUseSegmentLock()
                                             ? getTaskLockHelper().getLockedExistingSegments()
                                             : null;
      final boolean storeCompactionState = getContextValue(
          Tasks.STORE_COMPACTION_STATE_KEY,
          Tasks.DEFAULT_STORE_COMPACTION_STATE
      );
      final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
          addCompactionStateToSegments(
              storeCompactionState,
              toolbox,
              ingestionSchema
          );

      Set<DataSegment> tombStones = Collections.emptySet();
      if (getIngestionMode() == IngestionMode.REPLACE) {
        // check whether to generate tombstones...
        TombstoneHelper tombstoneHelper = new TombstoneHelper(toolbox.getTaskActionClient());

        List<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervals(
            pushed.getSegments(),
            ingestionSchema.getDataSchema()
        );
        // now find the versions for the tombstone intervals
        Map<Interval, SegmentIdWithShardSpec> tombstonesAndVersions = new HashMap<>();
        for (Interval interval : tombstoneIntervals) {
          SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
              ingestionSchema,
              interval.getStart()
          );
          tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
        }

        tombStones = tombstoneHelper.computeTombstones(ingestionSchema.getDataSchema(), tombstonesAndVersions);

        log.debugSegments(tombStones, "To publish tombstones");
      }

      // Probably we can publish atomicUpdateGroup along with segments.
      final SegmentsAndCommitMetadata published =
          awaitPublish(driver.publishAll(
              inputSegments,
              tombStones,
              publisher,
              annotateFunction,
              commitMetadataAndSchema.rhs
          ), pushTimeout);
      appenderator.close();

      // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
      // for awaitSegmentAvailabilityTimeoutMillis
      if (tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() > 0 && published != null) {
        ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
        ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>(published.getSegments());
        waitForSegmentAvailability(
            toolbox,
            segmentsToWaitFor,
            tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()
        );
      }

      ingestionState = IngestionState.COMPLETED;
      if (published == null) {
        log.error("Failed to publish segments, aborting!");
        errorMsg = "Failed to publish segments.";
        updateAndWriteCompletionReports(toolbox);
        return TaskStatus.failure(
            getId(),
            errorMsg
        );
      } else {
        log.info(
            "Processed[%,d] events, unparseable[%,d], thrownAway[%,d].",
            buildSegmentsMeters.getProcessed(),
            buildSegmentsMeters.getUnparseable(),
            buildSegmentsMeters.getThrownAway()
        );
        log.info("Published [%s] segments", published.getSegments().size());

        // publish metrics:
        emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", tombStones.size());
        // segments count metric is documented to include tombstones
        emitMetric(toolbox.getEmitter(), "ingest/segments/count",
                   published.getSegments().size() + tombStones.size()
        );

        log.debugSegments(published.getSegments(), "Published segments");

        updateAndWriteCompletionReports(
            toolbox,
            // only applicable to the compaction use cases
            inputSource instanceof DruidInputSource
            ? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead()
            : null,
            (long) published.getSegments().size()
        );
        return TaskStatus.success(getId());
      }
    }
    catch (TimeoutException | ExecutionException e) {
      exceptionOccurred = true;
      throw new RuntimeException(e);
    }
    catch (Exception e) {
      exceptionOccurred = true;
      throw e;
    }
    finally {
      if (exceptionOccurred) {
        appenderator.closeNow();
      } else {
        appenderator.close();
      }
      toolbox.removeMonitor(metricsMonitor);
    }
  }