in indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java [2013:2240]
private void discoverTasks() throws ExecutionException, InterruptedException
{
int taskCount = 0;
List<String> futureTaskIds = new ArrayList<>();
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
final Map<String, Task> activeTaskMap = getActiveTaskMap();
for (Task task : activeTaskMap.values()) {
if (!doesTaskTypeMatchSupervisor(task)) {
continue;
}
taskCount++;
@SuppressWarnings("unchecked")
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> seekableStreamIndexTask = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>) task;
final String taskId = task.getId();
// Check if the task has any inactive partitions. If so, terminate the task. Even if some of the
// partitions assigned to the task are still active, we still terminate the task. We terminate such tasks early
// to more rapidly ensure that all active partitions are evenly distributed and being read, and to avoid
// having to map expired partitions which are no longer tracked in partitionIds to a task group.
if (supportsPartitionExpiration()) {
Set<PartitionIdType> taskPartitions = seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet();
Set<PartitionIdType> inactivePartitionsInTask = Sets.difference(
taskPartitions,
new HashSet<>(partitionIds)
);
if (!inactivePartitionsInTask.isEmpty()) {
killTaskWithSuccess(
taskId,
"Task[%s] with partition set[%s] has inactive partitions[%s], stopping task.",
taskId,
taskPartitions,
inactivePartitionsInTask
);
continue;
}
}
// Determine which task group this task belongs to based on one of the partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
// by this task) and kill it if it is not compatible. If the task is instead found to be in the publishing
// state, we will permit it to complete even if it doesn't match our current partition allocation to support
// seamless schema migration.
Iterator<PartitionIdType> it = seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.iterator();
final Integer taskGroupId = (it.hasNext() ? getTaskGroupIdForPartition(it.next()) : null);
if (taskGroupId != null) {
// check to see if we already know about this task, either in [activelyReadingTaskGroups] or in [pendingCompletionTaskGroups]
// and if not add it to activelyReadingTaskGroups or pendingCompletionTaskGroups (if status = PUBLISHING)
TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
if (!isTaskInPendingCompletionGroups(taskId) && (taskGroup == null || !taskGroup.tasks.containsKey(taskId))) {
futureTaskIds.add(taskId);
futures.add(
Futures.transform(
getStatusAndPossiblyEndOffsets(taskId),
new Function<>()
{
@Override
public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>> pair)
{
final SeekableStreamIndexTaskRunner.Status status = pair.lhs;
final Map<PartitionIdType, SequenceOffsetType> publishingTaskEndOffsets = pair.rhs;
try {
log.debug("Task [%s], status [%s]", taskId, status);
if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) {
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.forEach(
partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
getTaskGroupIdForPartition(
partition),
taskId,
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
));
// update partitionGroups with the publishing task's sequences (if they are greater than what is
// existing) so that the next tasks will start reading from where this task left off.
// If we received invalid endOffset values, we clear the known offset to refetch the last committed offset
// from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure.
boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType> entry : publishingTaskEndOffsets.entrySet()) {
PartitionIdType partition = entry.getKey();
SequenceOffsetType sequence = entry.getValue();
if (sequence.equals(getEndOfPartitionMarker())) {
log.info(
"Got end-of-partition(EOS) marker for partition[%s] from task[%s] in discoverTasks, clearing partition offset to refetch from metadata.",
partition,
taskId
);
endOffsetsAreInvalid = true;
partitionOffsets.put(partition, getNotSetMarker());
}
}
if (!endOffsetsAreInvalid) {
for (Entry<PartitionIdType, SequenceOffsetType> entry : publishingTaskEndOffsets.entrySet()) {
PartitionIdType partition = entry.getKey();
SequenceOffsetType sequence = entry.getValue();
boolean succeeded;
do {
succeeded = true;
SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence);
if (previousOffset != null
&& (makeSequenceNumber(previousOffset)
.compareTo(makeSequenceNumber(
sequence))) < 0) {
succeeded = partitionOffsets.replace(partition, previousOffset, sequence);
}
} while (!succeeded);
}
}
} else {
for (PartitionIdType partition : seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()) {
if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn(
"Stopping task[%s] as it does not match the current partition allocation.",
taskId
);
// Returning false triggers a call to stopTask.
return false;
}
}
// make sure the task's io and tuning configs match with the supervisor config
// if it is current then only create corresponding taskGroup if it does not exist
if (!isTaskCurrent(taskGroupId, taskId, activeTaskMap)) {
log.info("Stopping task[%s] as it does not match the current supervisor spec.", taskId);
// Returning false triggers a call to stopTask.
return false;
} else {
final TaskGroup taskGroup = activelyReadingTaskGroups.computeIfAbsent(
taskGroupId,
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
// We reassign the task's original base sequence name (from the existing task) to the
// task group so that the replica segment allocations are the same.
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap()
),
null,
seekableStreamIndexTask.getIOConfig().getMinimumMessageTime(),
seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(),
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getExclusivePartitions(),
seekableStreamIndexTask.getIOConfig().getBaseSequenceName()
);
}
);
taskGroupsToVerify.put(taskGroupId, taskGroup);
final TaskData prevTaskData = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskData != null) {
throw new ISE(
"Task[%s] already exists in taskGroup[%d] with data[%s]",
taskId, taskGroup.groupId, prevTaskData
);
}
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
}
}
return true;
}
catch (Throwable t) {
stateManager.recordThrowableEvent(t);
log.error(t, "An error occurred while discovering task[%s]", taskId);
return null;
}
}
}, workerExec
)
);
}
}
}
List<Either<Throwable, Boolean>> results = coalesceAndAwait(futures);
final List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
for (int i = 0; i < results.size(); i++) {
String taskId = futureTaskIds.get(i);
if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
killTask(taskId, "Task[%s] failed to return status, killing task", taskId);
} else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow())) {
// "return false" above means that we want to stop the task.
stopFutures.add(stopTask(taskId, false));
}
}
log.debug("Found [%d] seekablestream indexing tasks for datasource[%s]", taskCount, dataSource);
if (!stopFutures.isEmpty()) {
coalesceAndAwait(stopFutures);
}
// make sure the checkpoints are consistent with each other and with the metadata store
verifyAndMergeCheckpoints(taskGroupsToVerify.values());
// A pause from the previous Overlord's supervisor, immediately before leader change,
// can lead to tasks being in a state where they are active but do not read.
resumeAllActivelyReadingTasks();
}