in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java [381:503]
void reassignPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> newPartitions)
throws Exception {
if (newPartitions.size() == 0) {
return;
}
hasAssignedPartitions = true;
boolean reassignmentStarted = false;
// since the reassignment may introduce several Kafka blocking calls that cannot be
// interrupted,
// the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and
// shutdown()
// until the reassignment is complete.
final KafkaConsumer<byte[], byte[]> consumerTmp;
synchronized (consumerReassignmentLock) {
consumerTmp = this.consumer;
this.consumer = null;
}
final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
try {
for (TopicPartition oldPartition : consumerTmp.assignment()) {
oldPartitionAssignmentsToPosition.put(
oldPartition, consumerTmp.position(oldPartition));
}
final List<TopicPartition> newPartitionAssignments =
new ArrayList<>(
newPartitions.size() + oldPartitionAssignmentsToPosition.size());
newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));
// reassign with the new partitions
consumerTmp.assign(newPartitionAssignments);
reassignmentStarted = true;
// old partitions should be seeked to their previous position
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition :
oldPartitionAssignmentsToPosition.entrySet()) {
consumerTmp.seek(
oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
// offsets in the state of new partitions may still be placeholder sentinel values if we
// are:
// (1) starting fresh,
// (2) checkpoint / savepoint state we were restored with had not completely
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<T, TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset()
== KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(
Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(
consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset()
== KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerTmp.seekToEnd(
Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(
consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset()
== KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(
consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(
newPartitionState.getKafkaPartitionHandle(),
newPartitionState.getOffset() + 1);
}
}
} catch (WakeupException e) {
// a WakeupException may be thrown if the consumer was invoked wakeup()
// before it was isolated for the reassignment. In this case, we abort the
// reassignment and just re-expose the original consumer.
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// if reassignment had already started and affected the consumer,
// we do a full roll back so that it is as if it was left untouched
if (reassignmentStarted) {
this.consumer.assign(
new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition :
oldPartitionAssignmentsToPosition.entrySet()) {
this.consumer.seek(
oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
}
// no need to restore the wakeup state in this case,
// since only the last wakeup call is effective anyways
hasBufferedWakeup = false;
// re-add all new partitions back to the unassigned partitions queue to be picked up
// again
for (KafkaTopicPartitionState<T, TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// this signals the main fetch loop to continue through the loop
throw new AbortedReassignmentException();
}
}
// reassignment complete; expose the reassigned consumer
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// restore wakeup state for the consumer if necessary
if (hasBufferedWakeup) {
this.consumer.wakeup();
hasBufferedWakeup = false;
}
}
}