public void open()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [560:754]


    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        this.offsetCommitMode =
                OffsetCommitModes.fromConfiguration(
                        getIsAutoCommitEnabled(),
                        enableCommitOnCheckpoints,
                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the partition discoverer
        this.partitionDiscoverer =
                createPartitionDiscoverer(
                        topicsDescriptor,
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
                    restoredState.entrySet()) {
                // seed the partition discoverer with the union state while filtering out
                // restored partitions that should not be subscribed by this subtask
                if (KafkaTopicPartitionAssigner.assign(
                                restoredStateEntry.getKey(),
                                getRuntimeContext().getNumberOfParallelSubtasks())
                        == getRuntimeContext().getIndexOfThisSubtask()) {
                    subscribedPartitionsToStartOffsets.put(
                            restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                subscribedPartitionsToStartOffsets
                        .entrySet()
                        .removeIf(
                                entry -> {
                                    if (!topicsDescriptor.isMatchingTopic(
                                            entry.getKey().getTopic())) {
                                        LOG.warn(
                                                "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                                                entry.getKey());
                                        return true;
                                    }
                                    return false;
                                });
            }

            LOG.info(
                    "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    subscribedPartitionsToStartOffsets.size(),
                    subscribedPartitionsToStartOffsets);
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
            // determined
            // when the partition is actually read.
            switch (startupMode) {
                case SPECIFIC_OFFSETS:
                    if (specificStartupOffsets == null) {
                        throw new IllegalStateException(
                                "Startup mode for the consumer set to "
                                        + StartupMode.SPECIFIC_OFFSETS
                                        + ", but no specific offsets were specified.");
                    }

                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        Long specificOffset = specificStartupOffsets.get(seedPartition);
                        if (specificOffset != null) {
                            // since the specified offsets represent the next record to read, we
                            // subtract
                            // it by one so that the initial state of the consumer will be correct
                            subscribedPartitionsToStartOffsets.put(
                                    seedPartition, specificOffset - 1);
                        } else {
                            // default to group offset behaviour if the user-provided specific
                            // offsets
                            // do not contain a value for this partition
                            subscribedPartitionsToStartOffsets.put(
                                    seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }

                    break;
                case TIMESTAMP:
                    if (startupOffsetsTimestamp == null) {
                        throw new IllegalStateException(
                                "Startup mode for the consumer set to "
                                        + StartupMode.TIMESTAMP
                                        + ", but no startup timestamp was specified.");
                    }

                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
                            fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
                                    .entrySet()) {
                        subscribedPartitionsToStartOffsets.put(
                                partitionToOffset.getKey(),
                                (partitionToOffset.getValue() == null)
                                        // if an offset cannot be retrieved for a partition with the
                                        // given timestamp,
                                        // we default to using the latest offset for the partition
                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                        // since the specified offsets represent the next record to
                                        // read, we subtract
                                        // it by one so that the initial state of the consumer will
                                        // be correct
                                        : partitionToOffset.getValue() - 1);
                    }

                    break;
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    case EARLIEST:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case TIMESTAMP:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                startupOffsetsTimestamp,
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
                                new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                                subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue()
                                    == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn(
                                    "Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"
                                            + "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    case GROUP_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info(
                        "Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }

        this.deserializer.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
    }