void reassignPartitions()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java [381:503]


    void reassignPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions)
            throws Exception {
        if (newPartitions.size() == 0) {
            return;
        }
        hasAssignedPartitions = true;
        boolean reassignmentStarted = false;

        // since the reassignment may introduce several Kafka blocking calls that cannot be
        // interrupted,
        // the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and
        // shutdown()
        // until the reassignment is complete.
        final KafkaConsumer<byte[], byte[]> consumerTmp;
        synchronized (consumerReassignmentLock) {
            consumerTmp = this.consumer;
            this.consumer = null;
        }

        final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
        try {
            for (TopicPartition oldPartition : consumerTmp.assignment()) {
                oldPartitionAssignmentsToPosition.put(
                        oldPartition, consumerTmp.position(oldPartition));
            }

            final List<TopicPartition> newPartitionAssignments =
                    new ArrayList<>(
                            newPartitions.size() + oldPartitionAssignmentsToPosition.size());
            newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
            newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

            // reassign with the new partitions
            consumerTmp.assign(newPartitionAssignments);
            reassignmentStarted = true;

            // old partitions should be seeked to their previous position
            for (Map.Entry<TopicPartition, Long> oldPartitionToPosition :
                    oldPartitionAssignmentsToPosition.entrySet()) {
                consumerTmp.seek(
                        oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
            }

            // offsets in the state of new partitions may still be placeholder sentinel values if we
            // are:
            //   (1) starting fresh,
            //   (2) checkpoint / savepoint state we were restored with had not completely
            //       been replaced with actual offset values yet, or
            //   (3) the partition was newly discovered after startup;
            // replace those with actual offsets, according to what the sentinel value represent.
            for (KafkaTopicPartitionState<T, TopicPartition> newPartitionState : newPartitions) {
                if (newPartitionState.getOffset()
                        == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
                    consumerTmp.seekToBeginning(
                            Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                    newPartitionState.setOffset(
                            consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                } else if (newPartitionState.getOffset()
                        == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                    consumerTmp.seekToEnd(
                            Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                    newPartitionState.setOffset(
                            consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                } else if (newPartitionState.getOffset()
                        == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                    // the KafkaConsumer by default will automatically seek the consumer position
                    // to the committed group offset, so we do not need to do it.

                    newPartitionState.setOffset(
                            consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                } else {
                    consumerTmp.seek(
                            newPartitionState.getKafkaPartitionHandle(),
                            newPartitionState.getOffset() + 1);
                }
            }
        } catch (WakeupException e) {
            // a WakeupException may be thrown if the consumer was invoked wakeup()
            // before it was isolated for the reassignment. In this case, we abort the
            // reassignment and just re-expose the original consumer.

            synchronized (consumerReassignmentLock) {
                this.consumer = consumerTmp;

                // if reassignment had already started and affected the consumer,
                // we do a full roll back so that it is as if it was left untouched
                if (reassignmentStarted) {
                    this.consumer.assign(
                            new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));

                    for (Map.Entry<TopicPartition, Long> oldPartitionToPosition :
                            oldPartitionAssignmentsToPosition.entrySet()) {
                        this.consumer.seek(
                                oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
                    }
                }

                // no need to restore the wakeup state in this case,
                // since only the last wakeup call is effective anyways
                hasBufferedWakeup = false;

                // re-add all new partitions back to the unassigned partitions queue to be picked up
                // again
                for (KafkaTopicPartitionState<T, TopicPartition> newPartition : newPartitions) {
                    unassignedPartitionsQueue.add(newPartition);
                }

                // this signals the main fetch loop to continue through the loop
                throw new AbortedReassignmentException();
            }
        }

        // reassignment complete; expose the reassigned consumer
        synchronized (consumerReassignmentLock) {
            this.consumer = consumerTmp;

            // restore wakeup state for the consumer if necessary
            if (hasBufferedWakeup) {
                this.consumer.wakeup();
                hasBufferedWakeup = false;
            }
        }
    }