public List discoverPartitions()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java [124:184]


    public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
        if (!closed && !wakeup) {
            try {
                List<KafkaTopicPartition> newDiscoveredPartitions;

                // (1) get all possible partitions, based on whether we are subscribed to fixed
                // topics or a topic pattern
                if (topicsDescriptor.isFixedTopics()) {
                    newDiscoveredPartitions =
                            getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
                } else {
                    List<String> matchedTopics = getAllTopics();

                    // retain topics that match the pattern
                    Iterator<String> iter = matchedTopics.iterator();
                    while (iter.hasNext()) {
                        if (!topicsDescriptor.isMatchingTopic(iter.next())) {
                            iter.remove();
                        }
                    }

                    if (matchedTopics.size() != 0) {
                        // get partitions only for matched topics
                        newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
                    } else {
                        newDiscoveredPartitions = null;
                    }
                }

                // (2) eliminate partition that are old partitions or should not be subscribed by
                // this subtask
                if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                    throw new RuntimeException(
                            "Unable to retrieve any partitions with KafkaTopicsDescriptor: "
                                    + topicsDescriptor);
                } else {
                    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                    KafkaTopicPartition nextPartition;
                    while (iter.hasNext()) {
                        nextPartition = iter.next();
                        if (!setAndCheckDiscoveredPartition(nextPartition)) {
                            iter.remove();
                        }
                    }
                }

                return newDiscoveredPartitions;
            } catch (WakeupException e) {
                // the actual topic / partition metadata fetching methods
                // may be woken up midway; reset the wakeup flag and rethrow
                wakeup = false;
                throw e;
            }
        } else if (!closed && wakeup) {
            // may have been woken up before the method call
            wakeup = false;
            throw new WakeupException();
        } else {
            throw new ClosedException();
        }
    }