public void run()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [757:830]


    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits =
                this.getRuntimeContext()
                        .getMetricGroup()
                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =
                this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();

        this.offsetCommitCallback =
                new KafkaCommitCallback() {
                    @Override
                    public void onSuccess() {
                        successfulCommits.inc();
                    }

                    @Override
                    public void onException(Throwable cause) {
                        LOG.warn(
                                String.format(
                                        "Consumer subtask %d failed async Kafka commit.",
                                        subtaskIndex),
                                cause);
                        failedCommits.inc();
                    }
                };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        LOG.info(
                "Consumer subtask {} creating fetcher with offsets {}.",
                getRuntimeContext().getIndexOfThisSubtask(),
                subscribedPartitionsToStartOffsets);
        // from this point forward:
        //   - 'snapshotState' will draw offsets from the fetcher,
        //     instead of being built from `subscribedPartitionsToStartOffsets`
        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        //     Kafka through the fetcher, if configured to do so)
        this.kafkaFetcher =
                createFetcher(
                        sourceContext,
                        subscribedPartitionsToStartOffsets,
                        watermarkStrategy,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode,
                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                        useMetrics);

        if (!running) {
            return;
        }

        // depending on whether we were restored with the current state version (1.3),
        // remaining logic branches off into 2 paths:
        //  1) New state - partition discovery loop executed as separate thread, with this
        //                 thread running the main fetcher loop
        //  2) Old state - partition discovery is disabled and only the main fetcher loop is
        // executed
        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            kafkaFetcher.runFetchLoop();
        } else {
            runWithPartitionDiscovery();
        }
    }