public Set group()

in samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java [148:236]


  public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
    // Validate that the task models are not empty.
    Map<TaskName, LocationId> taskLocality = grouperMetadata.getTaskLocality();
    Preconditions.checkArgument(!taskModels.isEmpty(), "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.");

    // Invoke the default grouper when the processor locality does not exist.
    if (MapUtils.isEmpty(grouperMetadata.getProcessorLocality())) {
      LOG.info("ProcessorLocality is empty. Generating with the default group method.");
      return group(taskModels, new ArrayList<>());
    }

    Map<String, LocationId> processorLocality = new TreeMap<>(grouperMetadata.getProcessorLocality());
    /**
     * When there're more task models than processors then choose the lexicographically least `x` processors(where x = tasks.size()).
     */
    if (processorLocality.size() > taskModels.size()) {
      processorLocality = processorLocality.entrySet()
                                           .stream()
                                           .limit(taskModels.size())
                                           .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    Map<LocationId, List<String>> locationIdToProcessors = new HashMap<>();
    Map<String, TaskGroup> processorIdToTaskGroup = new HashMap<>();

    // Generate the {@see LocationId} to processors mapping and processorId to {@see TaskGroup} mapping.
    processorLocality.forEach((processorId, locationId) -> {
      List<String> processorIds = locationIdToProcessors.getOrDefault(locationId, new ArrayList<>());
      processorIds.add(processorId);
      locationIdToProcessors.put(locationId, processorIds);
      processorIdToTaskGroup.put(processorId, new TaskGroup(processorId, new ArrayList<>()));
    });

    int numTasksPerProcessor = taskModels.size() / processorLocality.size();
    Set<TaskName> assignedTasks = new HashSet<>();

    /**
     * A processor is considered under-assigned when number of tasks assigned to it is less than
     * (number of tasks / number of processors).
     * Map the tasks to the under-assigned processors with same locality.
     */
    for (TaskModel taskModel : taskModels) {
      LocationId taskLocationId = taskLocality.get(taskModel.getTaskName());
      if (taskLocationId != null) {
        List<String> processorIds = locationIdToProcessors.getOrDefault(taskLocationId, new ArrayList<>());
        for (String processorId : processorIds) {
          TaskGroup taskGroup = processorIdToTaskGroup.get(processorId);
          if (taskGroup.size() < numTasksPerProcessor) {
            taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
            assignedTasks.add(taskModel.getTaskName());
            break;
          }
        }
      }
    }

    /**
     * In some scenarios, the task either might not have any previous locality or might not have any
     * processor that maps to its previous locality. This cyclic processorId's iterator helps us in
     * those scenarios to assign the processorIds to those kind of tasks in a round robin fashion.
     */
    Iterator<String> processorIdsCyclicIterator = Iterators.cycle(processorLocality.keySet());

    // Order the taskGroups to choose a task group in a deterministic fashion for unassigned tasks.
    List<TaskGroup> taskGroups = new ArrayList<>(processorIdToTaskGroup.values());
    taskGroups.sort(Comparator.comparing(TaskGroup::getContainerId));

    /**
     * For the tasks left over from the previous stage, map them to any under-assigned processor.
     * When a under-assigned processor doesn't exist, then map them to any processor from the
     * available processors in a round robin manner.
     */
    for (TaskModel taskModel : taskModels) {
      if (!assignedTasks.contains(taskModel.getTaskName())) {
        Optional<TaskGroup> underAssignedTaskGroup = taskGroups.stream()
                .filter(taskGroup -> taskGroup.size() < numTasksPerProcessor)
                .findFirst();
        if (underAssignedTaskGroup.isPresent()) {
          underAssignedTaskGroup.get().addTaskName(taskModel.getTaskName().getTaskName());
        } else {
          TaskGroup taskGroup = processorIdToTaskGroup.get(processorIdsCyclicIterator.next());
          taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
        }
        assignedTasks.add(taskModel.getTaskName());
      }
    }

    return TaskGroup.buildContainerModels(taskModels, taskGroups);
  }