in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [857:921]
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
discoveryLoopThread =
new Thread(
() -> {
try {
// --------------------- partition discovery loop
// ---------------------
// throughout the loop, we always eagerly check if we are still
// running before
// performing the next operation, so that we can escape the loop as
// soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Consumer subtask {} is trying to discover new partitions ...",
getRuntimeContext().getIndexOfThisSubtask());
}
final List<KafkaTopicPartition> discoveredPartitions;
try {
discoveredPartitions =
partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException
| AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up
// before or during the discovery;
// this would only happen if the consumer was canceled;
// simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed
// during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled
// midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
},
"Kafka Partition Discovery for "
+ getRuntimeContext().getTaskNameWithSubtasks());
discoveryLoopThread.start();
}