in samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java [148:236]
public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
// Validate that the task models are not empty.
Map<TaskName, LocationId> taskLocality = grouperMetadata.getTaskLocality();
Preconditions.checkArgument(!taskModels.isEmpty(), "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.");
// Invoke the default grouper when the processor locality does not exist.
if (MapUtils.isEmpty(grouperMetadata.getProcessorLocality())) {
LOG.info("ProcessorLocality is empty. Generating with the default group method.");
return group(taskModels, new ArrayList<>());
}
Map<String, LocationId> processorLocality = new TreeMap<>(grouperMetadata.getProcessorLocality());
/**
* When there're more task models than processors then choose the lexicographically least `x` processors(where x = tasks.size()).
*/
if (processorLocality.size() > taskModels.size()) {
processorLocality = processorLocality.entrySet()
.stream()
.limit(taskModels.size())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Map<LocationId, List<String>> locationIdToProcessors = new HashMap<>();
Map<String, TaskGroup> processorIdToTaskGroup = new HashMap<>();
// Generate the {@see LocationId} to processors mapping and processorId to {@see TaskGroup} mapping.
processorLocality.forEach((processorId, locationId) -> {
List<String> processorIds = locationIdToProcessors.getOrDefault(locationId, new ArrayList<>());
processorIds.add(processorId);
locationIdToProcessors.put(locationId, processorIds);
processorIdToTaskGroup.put(processorId, new TaskGroup(processorId, new ArrayList<>()));
});
int numTasksPerProcessor = taskModels.size() / processorLocality.size();
Set<TaskName> assignedTasks = new HashSet<>();
/**
* A processor is considered under-assigned when number of tasks assigned to it is less than
* (number of tasks / number of processors).
* Map the tasks to the under-assigned processors with same locality.
*/
for (TaskModel taskModel : taskModels) {
LocationId taskLocationId = taskLocality.get(taskModel.getTaskName());
if (taskLocationId != null) {
List<String> processorIds = locationIdToProcessors.getOrDefault(taskLocationId, new ArrayList<>());
for (String processorId : processorIds) {
TaskGroup taskGroup = processorIdToTaskGroup.get(processorId);
if (taskGroup.size() < numTasksPerProcessor) {
taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
assignedTasks.add(taskModel.getTaskName());
break;
}
}
}
}
/**
* In some scenarios, the task either might not have any previous locality or might not have any
* processor that maps to its previous locality. This cyclic processorId's iterator helps us in
* those scenarios to assign the processorIds to those kind of tasks in a round robin fashion.
*/
Iterator<String> processorIdsCyclicIterator = Iterators.cycle(processorLocality.keySet());
// Order the taskGroups to choose a task group in a deterministic fashion for unassigned tasks.
List<TaskGroup> taskGroups = new ArrayList<>(processorIdToTaskGroup.values());
taskGroups.sort(Comparator.comparing(TaskGroup::getContainerId));
/**
* For the tasks left over from the previous stage, map them to any under-assigned processor.
* When a under-assigned processor doesn't exist, then map them to any processor from the
* available processors in a round robin manner.
*/
for (TaskModel taskModel : taskModels) {
if (!assignedTasks.contains(taskModel.getTaskName())) {
Optional<TaskGroup> underAssignedTaskGroup = taskGroups.stream()
.filter(taskGroup -> taskGroup.size() < numTasksPerProcessor)
.findFirst();
if (underAssignedTaskGroup.isPresent()) {
underAssignedTaskGroup.get().addTaskName(taskModel.getTaskName().getTaskName());
} else {
TaskGroup taskGroup = processorIdToTaskGroup.get(processorIdsCyclicIterator.next());
taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
}
assignedTasks.add(taskModel.getTaskName());
}
}
return TaskGroup.buildContainerModels(taskModels, taskGroups);
}