private void discoverTasks()

in indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java [2013:2240]


  private void discoverTasks() throws ExecutionException, InterruptedException
  {
    int taskCount = 0;
    List<String> futureTaskIds = new ArrayList<>();
    List<ListenableFuture<Boolean>> futures = new ArrayList<>();

    final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();

    final Map<String, Task> activeTaskMap = getActiveTaskMap();

    for (Task task : activeTaskMap.values()) {
      if (!doesTaskTypeMatchSupervisor(task)) {
        continue;
      }

      taskCount++;
      @SuppressWarnings("unchecked")
      final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> seekableStreamIndexTask = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>) task;
      final String taskId = task.getId();

      // Check if the task has any inactive partitions. If so, terminate the task. Even if some of the
      // partitions assigned to the task are still active, we still terminate the task. We terminate such tasks early
      // to more rapidly ensure that all active partitions are evenly distributed and being read, and to avoid
      // having to map expired partitions which are no longer tracked in partitionIds to a task group.
      if (supportsPartitionExpiration()) {
        Set<PartitionIdType> taskPartitions = seekableStreamIndexTask.getIOConfig()
                                                                     .getStartSequenceNumbers()
                                                                     .getPartitionSequenceNumberMap()
                                                                     .keySet();
        Set<PartitionIdType> inactivePartitionsInTask = Sets.difference(
            taskPartitions,
            new HashSet<>(partitionIds)
        );
        if (!inactivePartitionsInTask.isEmpty()) {
          killTaskWithSuccess(
              taskId,
              "Task[%s] with partition set[%s] has inactive partitions[%s], stopping task.",
              taskId,
              taskPartitions,
              inactivePartitionsInTask
          );
          continue;
        }
      }

      // Determine which task group this task belongs to based on one of the partitions handled by this task. If we
      // later determine that this task is actively reading, we will make sure that it matches our current partition
      // allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
      // by this task) and kill it if it is not compatible. If the task is instead found to be in the publishing
      // state, we will permit it to complete even if it doesn't match our current partition allocation to support
      // seamless schema migration.

      Iterator<PartitionIdType> it = seekableStreamIndexTask.getIOConfig()
                                                            .getStartSequenceNumbers()
                                                            .getPartitionSequenceNumberMap()
                                                            .keySet()
                                                            .iterator();
      final Integer taskGroupId = (it.hasNext() ? getTaskGroupIdForPartition(it.next()) : null);

      if (taskGroupId != null) {
        // check to see if we already know about this task, either in [activelyReadingTaskGroups] or in [pendingCompletionTaskGroups]
        // and if not add it to activelyReadingTaskGroups or pendingCompletionTaskGroups (if status = PUBLISHING)
        TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);

        if (!isTaskInPendingCompletionGroups(taskId) && (taskGroup == null || !taskGroup.tasks.containsKey(taskId))) {
          futureTaskIds.add(taskId);
          futures.add(
              Futures.transform(
                  getStatusAndPossiblyEndOffsets(taskId),
                  new Function<>()
                  {
                    @Override
                    public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>> pair)
                    {
                      final SeekableStreamIndexTaskRunner.Status status = pair.lhs;
                      final Map<PartitionIdType, SequenceOffsetType> publishingTaskEndOffsets = pair.rhs;

                      try {
                        log.debug("Task [%s], status [%s]", taskId, status);
                        if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) {
                          seekableStreamIndexTask.getIOConfig()
                                                 .getStartSequenceNumbers()
                                                 .getPartitionSequenceNumberMap()
                                                 .keySet()
                                                 .forEach(
                                                     partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
                                                         getTaskGroupIdForPartition(
                                                             partition),
                                                         taskId,
                                                         seekableStreamIndexTask.getIOConfig()
                                                                                .getStartSequenceNumbers()
                                                                                .getPartitionSequenceNumberMap()
                                                     ));

                          // update partitionGroups with the publishing task's sequences (if they are greater than what is
                          // existing) so that the next tasks will start reading from where this task left off.
                          // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset
                          // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure.
                          boolean endOffsetsAreInvalid = false;
                          for (Entry<PartitionIdType, SequenceOffsetType> entry : publishingTaskEndOffsets.entrySet()) {
                            PartitionIdType partition = entry.getKey();
                            SequenceOffsetType sequence = entry.getValue();
                            if (sequence.equals(getEndOfPartitionMarker())) {
                              log.info(
                                  "Got end-of-partition(EOS) marker for partition[%s] from task[%s] in discoverTasks, clearing partition offset to refetch from metadata.",
                                  partition,
                                  taskId
                              );
                              endOffsetsAreInvalid = true;
                              partitionOffsets.put(partition, getNotSetMarker());
                            }
                          }

                          if (!endOffsetsAreInvalid) {
                            for (Entry<PartitionIdType, SequenceOffsetType> entry : publishingTaskEndOffsets.entrySet()) {
                              PartitionIdType partition = entry.getKey();
                              SequenceOffsetType sequence = entry.getValue();

                              boolean succeeded;
                              do {
                                succeeded = true;
                                SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence);
                                if (previousOffset != null
                                    && (makeSequenceNumber(previousOffset)
                                    .compareTo(makeSequenceNumber(
                                        sequence))) < 0) {
                                  succeeded = partitionOffsets.replace(partition, previousOffset, sequence);
                                }
                              } while (!succeeded);
                            }
                          }
                        } else {
                          for (PartitionIdType partition : seekableStreamIndexTask.getIOConfig()
                                                                                  .getStartSequenceNumbers()
                                                                                  .getPartitionSequenceNumberMap()
                                                                                  .keySet()) {
                            if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
                              log.warn(
                                  "Stopping task[%s] as it does not match the current partition allocation.",
                                  taskId
                              );

                              // Returning false triggers a call to stopTask.
                              return false;
                            }
                          }
                          // make sure the task's io and tuning configs match with the supervisor config
                          // if it is current then only create corresponding taskGroup if it does not exist
                          if (!isTaskCurrent(taskGroupId, taskId, activeTaskMap)) {
                            log.info("Stopping task[%s] as it does not match the current supervisor spec.", taskId);

                            // Returning false triggers a call to stopTask.
                            return false;
                          } else {
                            final TaskGroup taskGroup = activelyReadingTaskGroups.computeIfAbsent(
                                taskGroupId,
                                k -> {
                                  log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
                                  // We reassign the task's original base sequence name (from the existing task) to the
                                  // task group so that the replica segment allocations are the same.
                                  return new TaskGroup(
                                      taskGroupId,
                                      ImmutableMap.copyOf(
                                          seekableStreamIndexTask.getIOConfig()
                                                                 .getStartSequenceNumbers()
                                                                 .getPartitionSequenceNumberMap()
                                      ),
                                      null,
                                      seekableStreamIndexTask.getIOConfig().getMinimumMessageTime(),
                                      seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(),
                                      seekableStreamIndexTask.getIOConfig()
                                                             .getStartSequenceNumbers()
                                                             .getExclusivePartitions(),
                                      seekableStreamIndexTask.getIOConfig().getBaseSequenceName()
                                  );
                                }
                            );
                            taskGroupsToVerify.put(taskGroupId, taskGroup);
                            final TaskData prevTaskData = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
                            if (prevTaskData != null) {
                              throw new ISE(
                                  "Task[%s] already exists in taskGroup[%d] with data[%s]",
                                  taskId, taskGroup.groupId, prevTaskData
                              );
                            }
                            verifySameSequenceNameForAllTasksInGroup(taskGroupId);
                          }
                        }
                        return true;
                      }
                      catch (Throwable t) {
                        stateManager.recordThrowableEvent(t);
                        log.error(t, "An error occurred while discovering task[%s]", taskId);
                        return null;
                      }
                    }
                  }, workerExec
              )
          );
        }
      }
    }

    List<Either<Throwable, Boolean>> results = coalesceAndAwait(futures);

    final List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
    for (int i = 0; i < results.size(); i++) {
      String taskId = futureTaskIds.get(i);
      if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
        killTask(taskId, "Task[%s] failed to return status, killing task", taskId);
      } else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow())) {
        // "return false" above means that we want to stop the task.
        stopFutures.add(stopTask(taskId, false));
      }
    }
    log.debug("Found [%d] seekablestream indexing tasks for datasource[%s]", taskCount, dataSource);

    if (!stopFutures.isEmpty()) {
      coalesceAndAwait(stopFutures);
    }

    // make sure the checkpoints are consistent with each other and with the metadata store
    verifyAndMergeCheckpoints(taskGroupsToVerify.values());

    // A pause from the previous Overlord's supervisor, immediately before leader change,
    // can lead to tasks being in a state where they are active but do not read.
    resumeAllActivelyReadingTasks();
  }