in runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java [117:166]
private void doScheduleTaskList() {
final Optional<Collection<Task>> taskListOptional = pendingTaskCollectionPointer.getAndSetNull();
if (!taskListOptional.isPresent()) {
// Task list is empty
LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more Tasks...");
return;
}
final Collection<Task> taskList = taskListOptional.get();
final List<Task> couldNotSchedule = new ArrayList<>();
for (final Task task : taskList) {
if (!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
// Guard against race conditions causing duplicate task launches
LOG.debug("Skipping {} as it is not READY", task.getTaskId());
continue;
}
executorRegistry.viewExecutors(executors -> {
final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
// Filter out the candidate executors that do not meet scheduling constraints.
task.getExecutionProperties().forEachProperties(property -> {
final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
candidateExecutors.setValue(candidateExecutors.getValue().stream()
.filter(e -> constraint.get().testSchedulability(e, task))
.collect(Collectors.toSet()));
}
});
if (!candidateExecutors.getValue().isEmpty()) {
// Select executor
final ExecutorRepresenter selectedExecutor
= schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task);
// update metadata first
planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
// send the task
selectedExecutor.onTaskScheduled(task);
} else {
couldNotSchedule.add(task);
}
});
}
LOG.debug("All except {} were scheduled among {}", new Object[]{couldNotSchedule, taskList});
if (couldNotSchedule.size() > 0) {
// Try these again, if no new task list has been set
pendingTaskCollectionPointer.setIfNull(couldNotSchedule);
}
}