private TaskStatus runInternal()

in indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java [383:964]


  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
  {
    startTime = DateTimes.nowUtc();
    status = Status.STARTING;

    setToolbox(toolbox);

    authorizerMapper = toolbox.getAuthorizerMapper();
    rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
    parseExceptionHandler = new ParseExceptionHandler(
        rowIngestionMeters,
        tuningConfig.isLogParseExceptions(),
        tuningConfig.getMaxParseExceptions(),
        tuningConfig.getMaxSavedParseExceptions()
    );

    // Now we can initialize StreamChunkReader with the given toolbox.
    final StreamChunkParser parser = new StreamChunkParser<RecordType>(
        this.parser,
        inputFormat,
        inputRowSchema,
        task.getDataSchema().getTransformSpec(),
        toolbox.getIndexingTmpDir(),
        row -> row != null && withinMinMaxRecordTime(row),
        rowIngestionMeters,
        parseExceptionHandler
    );

    initializeSequences();

    log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
    toolbox.getChatHandlerProvider().register(task.getId(), this, false);

    runThread = Thread.currentThread();

    // Set up SegmentGenerationMetrics
    this.segmentGenerationMetrics = new SegmentGenerationMetrics();
    final TaskRealtimeMetricsMonitor metricsMonitor =
        TaskRealtimeMetricsMonitorBuilder.build(task, segmentGenerationMetrics, rowIngestionMeters);
    toolbox.addMonitor(metricsMonitor);

    final String lookupTier = task.getContextValue(CTX_KEY_LOOKUP_TIER);
    final LookupNodeService lookupNodeService = lookupTier == null ?
                                                toolbox.getLookupNodeService() :
                                                new LookupNodeService(lookupTier);

    final DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
        toolbox.getDruidNode(),
        NodeRole.PEON,
        ImmutableMap.of(
            toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
            lookupNodeService.getName(), lookupNodeService
        )
    );

    Throwable caughtExceptionOuter = null;

    //milliseconds waited for created segments to be handed off
    long handoffWaitMs = 0L;

    try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier =
             task.newTaskRecordSupplier(toolbox)) {
      if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
        toolbox.getDataSegmentServerAnnouncer().announce();
        toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
      }
      appenderator = task.newAppenderator(toolbox, segmentGenerationMetrics, rowIngestionMeters, parseExceptionHandler);
      driver = task.newDriver(appenderator, toolbox, segmentGenerationMetrics);

      // Start up, set up initial sequences.
      final Object restoredMetadata = driver.startJob(
          segmentId -> {
            try {
              if (lockGranularityToUse == LockGranularity.SEGMENT) {
                return toolbox.getTaskActionClient().submit(
                    new SegmentLockAcquireAction(
                        TaskLockType.EXCLUSIVE,
                        segmentId.getInterval(),
                        segmentId.getVersion(),
                        segmentId.getShardSpec().getPartitionNum(),
                        1000L
                    )
                ).isOk();
              } else {
                final TaskLock lock = toolbox.getTaskActionClient().submit(
                    new TimeChunkLockAcquireAction(
                        TaskLocks.determineLockTypeForAppend(task.getContext()),
                        segmentId.getInterval(),
                        1000L
                    )
                );
                if (lock == null) {
                  return false;
                }
                lock.assertNotRevoked();
                return true;
              }
            }
            catch (IOException e) {
              throw new RuntimeException(e);
            }
          }
      );
      if (restoredMetadata == null) {
        // no persist has happened so far
        // so either this is a brand new task or replacement of a failed task
        Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
            partitionOffsetEntry ->
                createSequenceNumber(partitionOffsetEntry.getValue()).compareTo(
                    createSequenceNumber(ioConfig.getStartSequenceNumbers()
                                                 .getPartitionSequenceNumberMap()
                                                 .get(partitionOffsetEntry.getKey())
                    )) >= 0
        ), "Sequence sequences are not compatible with start sequences of task");
        currOffsets.putAll(sequences.get(0).startOffsets);
      } else {
        @SuppressWarnings("unchecked")
        final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
        final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> restoredNextPartitions =
            deserializePartitionsFromMetadata(
                toolbox.getJsonMapper(),
                restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
            );

        currOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap());

        // Sanity checks.
        if (!restoredNextPartitions.getStream().equals(ioConfig.getStartSequenceNumbers().getStream())) {
          throw new ISE(
              "Restored stream[%s] but expected stream[%s]",
              restoredNextPartitions.getStream(),
              ioConfig.getStartSequenceNumbers().getStream()
          );
        }

        if (!currOffsets.keySet().equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) {
          throw new ISE(
              "Restored partitions[%s] but expected partitions[%s]",
              currOffsets.keySet(),
              ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()
          );
        }
        // sequences size can be 0 only when all sequences got published and task stopped before it could finish
        // which is super rare
        if (sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
          this.endOffsets.putAll(sequences.size() == 0
                                 ? currOffsets
                                 : getLastSequenceMetadata().getEndOffsets());
        }
      }

      log.info(
          "Initialized sequences: %s",
          sequences.stream().map(SequenceMetadata::toString).collect(Collectors.joining(", "))
      );

      // Filter out partitions with END_OF_SHARD markers since these partitions have already been fully read. This
      // should have been done by the supervisor already so this is defensive.
      int numPreFilterPartitions = currOffsets.size();
      if (currOffsets.entrySet().removeIf(x -> isEndOfShard(x.getValue()))) {
        log.info(
            "Removed [%d] partitions from assignment which have already been closed.",
            numPreFilterPartitions - currOffsets.size()
        );
      }

      // Initialize lastReadOffsets immediately after restoring currOffsets. This is only done when end offsets are
      // inclusive, because the point of initializing lastReadOffsets here is so we know when to skip the start record.
      // When end offsets are exclusive, we never skip the start record.
      if (!isEndOffsetExclusive()) {
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : currOffsets.entrySet()) {
          final boolean isAtStart = entry.getValue().equals(
              ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(entry.getKey())
          );

          if (!isAtStart || ioConfig.getStartSequenceNumbers().getExclusivePartitions().contains(entry.getKey())) {
            lastReadOffsets.put(entry.getKey(), entry.getValue());
          }
        }
      }

      // Set up committer.
      final Supplier<Committer> committerSupplier = () -> {
        final Map<PartitionIdType, SequenceOffsetType> snapshot = ImmutableMap.copyOf(currOffsets);
        lastPersistedOffsets.clear();
        lastPersistedOffsets.putAll(snapshot);

        return new Committer()
        {
          @Override
          public Object getMetadata()
          {
            return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new SeekableStreamEndSequenceNumbers<>(stream, snapshot));
          }

          @Override
          public void run()
          {
            // Do nothing.
          }
        };
      };

      // restart publishing of sequences (if any)
      maybePersistAndPublishSequences(committerSupplier);

      Set<StreamPartition<PartitionIdType>> assignment = assignPartitions(recordSupplier);
      possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
      seekToStartingSequence(recordSupplier, assignment);

      ingestionState = IngestionState.BUILD_SEGMENTS;

      // Main loop.
      // Could eventually support leader/follower mode (for keeping replicas more in sync)
      boolean stillReading = !assignment.isEmpty();
      status = Status.READING;
      Throwable caughtExceptionInner = null;

      try {
        while (stillReading) {
          if (possiblyPause()) {
            // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
            // partitions upon resuming. Don't call "seekToStartingSequence" after "assignPartitions", because there's
            // no need to re-seek here. All we're going to be doing is dropping partitions.
            assignment = assignPartitions(recordSupplier);
            possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);

            if (assignment.isEmpty()) {
              log.debug("All partitions have been fully read.");
              publishOnStop.set(true);
              stopRequested.set(true);
            }
          }

          // if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true
          if (stopRequested.get() || sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
            status = Status.PUBLISHING;
          }

          if (stopRequested.get()) {
            break;
          }

          if (backgroundThreadException != null) {
            throw new RuntimeException(backgroundThreadException);
          }

          checkPublishAndHandoffFailure();

          maybePersistAndPublishSequences(committerSupplier);

          // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException
          // are handled in the subclasses.
          List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> records = getRecords(
              recordSupplier,
              toolbox
          );

          // note: getRecords() also updates assignment
          stillReading = !assignment.isEmpty();

          SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToCheckpoint = null;
          AppenderatorDriverAddResult pushTriggeringAddResult = null;
          for (OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType> record : records) {
            final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber());

            log.trace(
                "Got stream[%s] partition[%s] sequenceNumber[%s], shouldProcess[%s].",
                record.getStream(),
                record.getPartitionId(),
                record.getSequenceNumber(),
                shouldProcess
            );

            if (shouldProcess) {
              final List<InputRow> rows = parser.parse(record.getData(), isEndOfShard(record.getSequenceNumber()));
              boolean isPersistRequired = false;

              final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
                  .stream()
                  .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
                  .findFirst()
                  .orElse(null);

              if (sequenceToUse == null) {
                throw new ISE(
                    "Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s",
                    record.getPartitionId(),
                    record.getSequenceNumber(),
                    sequences
                );
              }

              for (InputRow row : rows) {
                final AppenderatorDriverAddResult addResult = driver.add(
                    row,
                    sequenceToUse.getSequenceName(),
                    committerSupplier,
                    true,
                    // do not allow incremental persists to happen until all the rows from this batch
                    // of rows are indexed
                    false
                );

                if (addResult.isOk()) {
                  // If the number of rows in the segment exceeds the threshold after adding a row,
                  // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
                  final boolean isPushRequired = addResult.isPushRequired(
                      tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
                      tuningConfig.getPartitionsSpec()
                                  .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
                  );
                  if (isPushRequired && !sequenceToUse.isCheckpointed()) {
                    pushTriggeringAddResult = addResult;
                    sequenceToCheckpoint = sequenceToUse;
                  }
                  isPersistRequired |= addResult.isPersistRequired();
                  partitionsThroughput.merge(record.getPartitionId(), 1L, Long::sum);
                } else {
                  // Failure to allocate segment puts determinism at risk, bail out to be safe.
                  // May want configurable behavior here at some point.
                  // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
                  throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
                }
              }
              if (isPersistRequired) {
                Futures.addCallback(
                    driver.persistAsync(committerSupplier.get()),
                    new FutureCallback<>()
                    {
                      @Override
                      public void onSuccess(@Nullable Object result)
                      {
                        log.debug("Persist completed with metadata: %s", result);
                      }

                      @Override
                      public void onFailure(Throwable t)
                      {
                        log.error("Persist failed, dying");
                        backgroundThreadException = t;
                      }
                    },
                    MoreExecutors.directExecutor()
                );
              }

              // in kafka, we can easily get the next offset by adding 1, but for kinesis, there's no way
              // to get the next sequence number without having to make an expensive api call. So the behavior
              // here for kafka is to +1 while for kinesis we simply save the current sequence number
              lastReadOffsets.put(record.getPartitionId(), record.getSequenceNumber());
              currOffsets.put(record.getPartitionId(), getNextStartOffset(record.getSequenceNumber()));
            }

            // Use record.getSequenceNumber() in the moreToRead check, since currOffsets might not have been
            // updated if we were skipping records for being beyond the end.
            final boolean moreToReadAfterThisRecord = isMoreToReadAfterReadingRecord(
                record.getSequenceNumber(),
                endOffsets.get(record.getPartitionId())
            );

            if (!moreToReadAfterThisRecord && assignment.remove(record.getStreamPartition())) {
              log.info("Finished reading stream[%s], partition[%s].", record.getStream(), record.getPartitionId());
              recordSupplier.assign(assignment);
              stillReading = !assignment.isEmpty();
            }
          }

          if (!stillReading) {
            // We let the fireDepartmentMetrics know that all messages have been read. This way, some metrics such as
            // high message gap need not be reported
            segmentGenerationMetrics.markProcessingDone();
          }

          if (System.currentTimeMillis() > nextCheckpointTime) {
            sequenceToCheckpoint = getLastSequenceMetadata();
            log.info(
                "Next checkpoint time, updating sequenceToCheckpoint, SequenceToCheckpoint: [%s]",
                sequenceToCheckpoint
            );
          }
          if (pushTriggeringAddResult != null) {
            log.info(
                "Hit the row limit updating sequenceToCheckpoint, SequenceToCheckpoint: [%s], rowInSegment: [%s], TotalRows: [%s]",
                sequenceToCheckpoint,
                pushTriggeringAddResult.getNumRowsInSegment(),
                pushTriggeringAddResult.getTotalNumRowsInAppenderator()
            );
          }

          if (sequenceToCheckpoint != null && stillReading) {
            Preconditions.checkArgument(
                getLastSequenceMetadata()
                    .getSequenceName()
                    .equals(sequenceToCheckpoint.getSequenceName()),
                "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s",
                sequenceToCheckpoint,
                sequences
            );
            requestPause();
            final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
                task.getDataSource(),
                ioConfig.getTaskGroupId(),
                null,
                createDataSourceMetadata(
                    new SeekableStreamStartSequenceNumbers<>(
                        stream,
                        sequenceToCheckpoint.getStartOffsets(),
                        sequenceToCheckpoint.getExclusiveStartPartitions()
                    )
                )
            );
            if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
              throw new ISE("Checkpoint request with sequences [%s] failed, dying", currOffsets);
            }
          }
        }
      }
      catch (Exception e) {
        // (1) catch all exceptions while reading from kafka
        caughtExceptionInner = e;
        log.error(e, "Encountered exception in run() before persisting.");
        throw e;
      }
      finally {
        try {
          // To handle cases where tasks stop reading due to stop request or exceptions
          segmentGenerationMetrics.markProcessingDone();
          driver.persist(committerSupplier.get()); // persist pending data
        }
        catch (Exception e) {
          if (caughtExceptionInner != null) {
            caughtExceptionInner.addSuppressed(e);
          } else {
            throw e;
          }
        }
      }

      synchronized (statusLock) {
        if (stopRequested.get() && !publishOnStop.get()) {
          throw new InterruptedException("Stopping without publishing");
        }

        status = Status.PUBLISHING;
      }

      // We need to copy sequences here, because the success callback in publishAndRegisterHandoff removes items from
      // the sequence list. If a publish finishes before we finish iterating through the sequence list, we can
      // end up skipping some sequences.
      List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences);
      for (int i = 0; i < sequencesSnapshot.size(); i++) {
        final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i);
        if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
            && !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
          final boolean isLast = i == (sequencesSnapshot.size() - 1);
          if (isLast) {
            // Shorten endOffsets of the last sequence to match currOffsets.
            sequenceMetadata.setEndOffsets(currOffsets);
          }

          // Update assignments of the sequence, which should clear them. (This will be checked later, when the
          // Committer is built.)
          sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord);
          publishingSequences.add(sequenceMetadata.getSequenceName());
          // persist already done in finally, so directly add to publishQueue
          publishAndRegisterHandoff(sequenceMetadata);
        }
      }

      if (backgroundThreadException != null) {
        throw new RuntimeException(backgroundThreadException);
      }

      // Wait for publish futures to complete.
      Futures.allAsList(publishWaitList).get();

      // Wait for handoff futures to complete.
      // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding
      // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it
      // failed to persist sequences. It might also return null if handoff failed, but was recoverable.
      // See publishAndRegisterHandoff() for details.
      List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
      ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
      if (tuningConfig.getHandoffConditionTimeout() == 0) {
        handedOffList = Futures.allAsList(handOffWaitList).get();
      } else {
        final long start = System.nanoTime();
        try {
          handedOffList = Futures.allAsList(handOffWaitList)
                                 .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
          // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception
          // here.
          log.makeAlert("Timeout waiting for handoff")
             .addData("taskId", task.getId())
             .addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout())
             .emit();
        }
        finally {
          handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        }
      }

      for (SegmentsAndCommitMetadata handedOff : handedOffList) {
        log.info(
            "Handoff complete for segments: %s",
            String.join(", ", Lists.transform(handedOff.getSegments(), DataSegment::toString))
        );
      }

      appenderator.close();
    }
    catch (InterruptedException | RejectedExecutionException e) {
      // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
      // the final publishing.
      caughtExceptionOuter = e;
      try {
        Futures.allAsList(publishWaitList).cancel(true);
        Futures.allAsList(handOffWaitList).cancel(true);
        if (appenderator != null) {
          appenderator.closeNow();
        }
      }
      catch (Exception e2) {
        e.addSuppressed(e2);
      }

      // handle the InterruptedException that gets wrapped in a RejectedExecutionException
      if (e instanceof RejectedExecutionException
          && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
        throw e;
      }

      // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
      if (!stopRequested.get()) {
        Thread.currentThread().interrupt();
        throw e;
      }
    }
    catch (Exception e) {
      // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
      caughtExceptionOuter = e;
      try {
        Futures.allAsList(publishWaitList).cancel(true);
        Futures.allAsList(handOffWaitList).cancel(true);
        if (appenderator != null) {
          appenderator.closeNow();
        }
      }
      catch (Exception e2) {
        e.addSuppressed(e2);
      }
      throw e;
    }
    finally {
      try {
        if (driver != null) {
          driver.close();
        }
        toolbox.getChatHandlerProvider().unregister(task.getId());
        toolbox.removeMonitor(metricsMonitor);
        if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
          toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
          toolbox.getDataSegmentServerAnnouncer().unannounce();
        }
        rejectionPeriodUpdaterExec.shutdown();
      }
      catch (Throwable e) {
        if (caughtExceptionOuter != null) {
          caughtExceptionOuter.addSuppressed(e);
        } else {
          throw e;
        }
      }
    }

    ingestionState = IngestionState.COMPLETED;
    toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs));
    return TaskStatus.success(task.getId());
  }