private void doScheduleTaskList()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java [117:166]


  private void doScheduleTaskList() {
    final Optional<Collection<Task>> taskListOptional = pendingTaskCollectionPointer.getAndSetNull();
    if (!taskListOptional.isPresent()) {
      // Task list is empty
      LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more Tasks...");
      return;
    }

    final Collection<Task> taskList = taskListOptional.get();
    final List<Task> couldNotSchedule = new ArrayList<>();
    for (final Task task : taskList) {
      if (!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
        // Guard against race conditions causing duplicate task launches
        LOG.debug("Skipping {} as it is not READY", task.getTaskId());
        continue;
      }

      executorRegistry.viewExecutors(executors -> {
        final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
        // Filter out the candidate executors that do not meet scheduling constraints.
        task.getExecutionProperties().forEachProperties(property -> {
          final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass());
          if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) {
            candidateExecutors.setValue(candidateExecutors.getValue().stream()
              .filter(e -> constraint.get().testSchedulability(e, task))
              .collect(Collectors.toSet()));
          }
        });
        if (!candidateExecutors.getValue().isEmpty()) {
          // Select executor
          final ExecutorRepresenter selectedExecutor
            = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task);
          // update metadata first
          planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);

          LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
          // send the task
          selectedExecutor.onTaskScheduled(task);
        } else {
          couldNotSchedule.add(task);
        }
      });
    }

    LOG.debug("All except {} were scheduled among {}", new Object[]{couldNotSchedule, taskList});
    if (couldNotSchedule.size() > 0) {
      // Try these again, if no new task list has been set
      pendingTaskCollectionPointer.setIfNull(couldNotSchedule);
    }
  }