in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java [124:184]
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
if (!closed && !wakeup) {
try {
List<KafkaTopicPartition> newDiscoveredPartitions;
// (1) get all possible partitions, based on whether we are subscribed to fixed
// topics or a topic pattern
if (topicsDescriptor.isFixedTopics()) {
newDiscoveredPartitions =
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
} else {
List<String> matchedTopics = getAllTopics();
// retain topics that match the pattern
Iterator<String> iter = matchedTopics.iterator();
while (iter.hasNext()) {
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
iter.remove();
}
}
if (matchedTopics.size() != 0) {
// get partitions only for matched topics
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
} else {
newDiscoveredPartitions = null;
}
}
// (2) eliminate partition that are old partitions or should not be subscribed by
// this subtask
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
throw new RuntimeException(
"Unable to retrieve any partitions with KafkaTopicsDescriptor: "
+ topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
}
return newDiscoveredPartitions;
} catch (WakeupException e) {
// the actual topic / partition metadata fetching methods
// may be woken up midway; reset the wakeup flag and rethrow
wakeup = false;
throw e;
}
} else if (!closed && wakeup) {
// may have been woken up before the method call
wakeup = false;
throw new WakeupException();
} else {
throw new ClosedException();
}
}