private void createAndStartDiscoveryLoop()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [857:921]


    private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
        discoveryLoopThread =
                new Thread(
                        () -> {
                            try {
                                // --------------------- partition discovery loop
                                // ---------------------

                                // throughout the loop, we always eagerly check if we are still
                                // running before
                                // performing the next operation, so that we can escape the loop as
                                // soon as possible

                                while (running) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug(
                                                "Consumer subtask {} is trying to discover new partitions ...",
                                                getRuntimeContext().getIndexOfThisSubtask());
                                    }

                                    final List<KafkaTopicPartition> discoveredPartitions;
                                    try {
                                        discoveredPartitions =
                                                partitionDiscoverer.discoverPartitions();
                                    } catch (AbstractPartitionDiscoverer.WakeupException
                                            | AbstractPartitionDiscoverer.ClosedException e) {
                                        // the partition discoverer may have been closed or woken up
                                        // before or during the discovery;
                                        // this would only happen if the consumer was canceled;
                                        // simply escape the loop
                                        break;
                                    }

                                    // no need to add the discovered partitions if we were closed
                                    // during the meantime
                                    if (running && !discoveredPartitions.isEmpty()) {
                                        kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                    }

                                    // do not waste any time sleeping if we're not running anymore
                                    if (running && discoveryIntervalMillis != 0) {
                                        try {
                                            Thread.sleep(discoveryIntervalMillis);
                                        } catch (InterruptedException iex) {
                                            // may be interrupted if the consumer was canceled
                                            // midway; simply escape the loop
                                            break;
                                        }
                                    }
                                }
                            } catch (Exception e) {
                                discoveryLoopErrorRef.set(e);
                            } finally {
                                // calling cancel will also let the fetcher loop escape
                                // (if not running, cancel() was already called)
                                if (running) {
                                    cancel();
                                }
                            }
                        },
                        "Kafka Partition Discovery for "
                                + getRuntimeContext().getTaskNameWithSubtasks());

        discoveryLoopThread.start();
    }