private TaskStatus runInternal()

in indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java [337:530]


  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
  {
    boolean indexGeneratorJobAttempted = false;
    boolean indexGeneratorJobSuccess = false;
    HadoopIngestionSpec indexerSchema = null;
    try {
      registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
      String hadoopJobIdFile = getHadoopJobIdFileName();
      logExtensionsConfig();
      final ClassLoader loader = buildClassLoader(toolbox);
      boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();

      HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
          spec,
          jsonMapper,
          new OverlordActionBasedUsedSegmentsRetriever(toolbox)
      );

      Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
          loader
      );
      determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);

      String[] determinePartitionsInput = new String[]{
          toolbox.getJsonMapper().writeValueAsString(spec),
          toolbox.getConfig().getHadoopWorkingPath(),
          toolbox.getSegmentPusher().getPathForHadoop(),
          hadoopJobIdFile
      };

      final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
      Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
      Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
          "runTask",
          determinePartitionsInput.getClass()
      );
      try {
        Thread.currentThread().setContextClassLoader(loader);

        ingestionState = IngestionState.DETERMINE_PARTITIONS;

        final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
            determinePartitionsInnerProcessingRunner,
            new Object[]{determinePartitionsInput}
        );


        determineConfigStatus = toolbox
            .getJsonMapper()
            .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);

        indexerSchema = determineConfigStatus.getSchema();
        if (indexerSchema == null) {
          errorMsg = determineConfigStatus.getErrorMsg();
          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
          return TaskStatus.failure(
              getId(),
              errorMsg
          );
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      finally {
        Thread.currentThread().setContextClassLoader(oldLoader);
      }

      // We should have a lock from before we started running only if interval was specified
      String version;
      if (determineIntervals) {
        Interval interval = JodaUtils.umbrellaInterval(
            JodaUtils.condenseIntervals(
                indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
            )
        );
        final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
        // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
        final TaskLock lock = Preconditions.checkNotNull(
            toolbox.getTaskActionClient().submit(
                new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
            ),
            "Cannot acquire a lock for interval[%s]", interval
        );
        lock.assertNotRevoked();
        version = lock.getVersion();
      } else {
        Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
        final TaskLock myLock = Iterables.getOnlyElement(locks);
        version = myLock.getVersion();
      }

      final String specVersion = indexerSchema.getTuningConfig().getVersion();
      if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
        if (specVersion.compareTo(version) < 0) {
          version = specVersion;
        } else {
          String errMsg =
              StringUtils.format(
                  "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
                  specVersion,
                  version
              );
          log.error(errMsg);
          toolbox.getTaskReportFileWriter().write(getId(), null);
          return TaskStatus.failure(getId(), errMsg);
        }
      }

      log.info("Setting version to: %s", version);

      Object innerProcessingRunner = getForeignClassloaderObject(
          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
          loader
      );
      buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);

      String[] buildSegmentsInput = new String[]{
          toolbox.getJsonMapper().writeValueAsString(indexerSchema),
          version,
          hadoopJobIdFile
      };

      Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
      Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());

      try {
        Thread.currentThread().setContextClassLoader(loader);

        ingestionState = IngestionState.BUILD_SEGMENTS;
        indexGeneratorJobAttempted = true;
        final String jobStatusString = (String) innerProcessingRunTask.invoke(
            innerProcessingRunner,
            new Object[]{buildSegmentsInput}
        );

        buildSegmentsStatus = toolbox.getJsonMapper().readValue(
            jobStatusString,
            HadoopIndexGeneratorInnerProcessingStatus.class
        );

        List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
        if (dataSegmentAndIndexZipFilePaths != null) {
          indexGeneratorJobSuccess = true;
          renameSegmentIndexFilesJob(
              toolbox.getJsonMapper().writeValueAsString(indexerSchema),
              toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
          );

          ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
                                                                                           .map(
                                                                                               DataSegmentAndIndexZipFilePath::getSegment)
                                                                                           .collect(Collectors.toList()));
          toolbox.publishSegments(segments);

          // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
          // for awaitSegmentAvailabilityTimeoutMillis
          if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
            ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
            waitForSegmentAvailability(
                toolbox,
                segments,
                spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
            );
          }

          ingestionState = IngestionState.COMPLETED;
          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
          return TaskStatus.success(getId());
        } else {
          errorMsg = buildSegmentsStatus.getErrorMsg();
          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
          return TaskStatus.failure(
              getId(),
              errorMsg
          );
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      finally {
        Thread.currentThread().setContextClassLoader(oldLoader);
      }
    }
    finally {
      indexerGeneratorCleanupJob(
          indexGeneratorJobAttempted,
          indexGeneratorJobSuccess,
          indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema)
      );
    }
  }