public void assignSplits()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java [124:152]


    public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {
        if (logger.isInfoEnabled()) {
            logger.info(
                    "Assigning {} splits for cluster {}: {}",
                    newSplitAssignments.assignment().values().stream()
                            .mapToLong(Collection::size)
                            .sum(),
                    kafkaClusterId,
                    newSplitAssignments);
        }

        Map<Integer, List<DynamicKafkaSourceSplit>> readerToSplitsMap = new HashMap<>();
        newSplitAssignments
                .assignment()
                .forEach(
                        (subtask, splits) ->
                                readerToSplitsMap.put(
                                        subtask,
                                        splits.stream()
                                                .map(
                                                        split ->
                                                                new DynamicKafkaSourceSplit(
                                                                        kafkaClusterId, split))
                                                .collect(Collectors.toList())));

        if (!readerToSplitsMap.isEmpty()) {
            enumContext.assignSplits(new SplitsAssignment<>(readerToSplitsMap));
        }
    }