public void run()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java [155:292]


    public void run() {
        // early exit check
        if (!running) {
            return;
        }

        // this is the means to talk to FlinkKafkaConsumer's main thread
        final Handover handover = this.handover;

        // This method initializes the KafkaConsumer and guarantees it is torn down properly.
        // This is important, because the consumer has multi-threading issues,
        // including concurrent 'close()' calls.
        try {
            this.consumer = getConsumer(kafkaProperties);
        } catch (Throwable t) {
            handover.reportError(t);
            return;
        }

        // from here on, the consumer is guaranteed to be closed properly
        try {
            // register Kafka's very own metrics in Flink's metric reporters
            if (useMetrics) {
                // register Kafka metrics to Flink
                Map<MetricName, ? extends Metric> metrics = consumer.metrics();
                if (metrics == null) {
                    // MapR's Kafka implementation returns null here.
                    log.info("Consumer implementation does not support metrics");
                } else {
                    // we have Kafka metrics, register them
                    for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
                        consumerMetricGroup.gauge(
                                metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));

                        // TODO this metric is kept for compatibility purposes; should remove in the
                        // future
                        subtaskMetricGroup.gauge(
                                metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
                    }
                }
            }

            // early exit check
            if (!running) {
                return;
            }

            // the latest bulk of records. May carry across the loop if the thread is woken up
            // from blocking on the handover
            ConsumerRecords<byte[], byte[]> records = null;

            // reused variable to hold found unassigned new partitions.
            // found partitions are not carried across loops using this variable;
            // they are carried across via re-adding them to the unassigned partitions queue
            List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions;

            // main fetch loop
            while (running) {

                // check if there is something to commit
                if (!commitInProgress) {
                    // get and reset the work-to-be committed, so we don't repeatedly commit the
                    // same
                    final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback>
                            commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

                    if (commitOffsetsAndCallback != null) {
                        log.debug("Sending async offset commit request to Kafka broker");

                        // also record that a commit is already in progress
                        // the order here matters! first set the flag, then send the commit command.
                        commitInProgress = true;
                        retryOnceOnWakeup(
                                () ->
                                        consumer.commitAsync(
                                                commitOffsetsAndCallback.f0,
                                                new CommitCallback(commitOffsetsAndCallback.f1)),
                                "commitAsync");
                    }
                }

                try {
                    if (hasAssignedPartitions) {
                        newPartitions = unassignedPartitionsQueue.pollBatch();
                    } else {
                        // if no assigned partitions block until we get at least one
                        // instead of hot spinning this loop. We rely on a fact that
                        // unassignedPartitionsQueue will be closed on a shutdown, so
                        // we don't block indefinitely
                        newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                    }
                    if (newPartitions != null) {
                        reassignPartitions(newPartitions);
                    }
                } catch (AbortedReassignmentException e) {
                    continue;
                }

                if (!hasAssignedPartitions) {
                    // Without assigned partitions KafkaConsumer.poll will throw an exception
                    continue;
                }

                // get the next batch of records, unless we did not manage to hand the old batch
                // over
                if (records == null) {
                    try {
                        records = consumer.poll(Duration.ofMillis(pollTimeout));
                    } catch (WakeupException we) {
                        continue;
                    }
                }

                try {
                    handover.produce(records);
                    records = null;
                } catch (Handover.WakeupException e) {
                    // fall through the loop
                }
            }
            // end main fetch loop
        } catch (Throwable t) {
            // let the main thread know and exit
            // it may be that this exception comes because the main thread closed the handover, in
            // which case the below reporting is irrelevant, but does not hurt either
            handover.reportError(t);
        } finally {
            // make sure the handover is closed if it is not already closed or has an error
            handover.close();

            // make sure the KafkaConsumer is closed
            try {
                consumer.close();
            } catch (Throwable t) {
                log.warn("Error while closing Kafka consumer", t);
            }
        }
    }