in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [560:754]
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
this.offsetCommitMode =
OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// create the partition discoverer
this.partitionDiscoverer =
createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
restoredState.entrySet()) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(),
getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()) {
subscribedPartitionsToStartOffsets.put(
restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
subscribedPartitionsToStartOffsets
.entrySet()
.removeIf(
entry -> {
if (!topicsDescriptor.isMatchingTopic(
entry.getKey().getTopic())) {
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
entry.getKey());
return true;
}
return false;
});
}
LOG.info(
"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial seed partitions,
// and set their initial offsets depending on the startup mode.
// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
// determined
// when the partition is actually read.
switch (startupMode) {
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to "
+ StartupMode.SPECIFIC_OFFSETS
+ ", but no specific offsets were specified.");
}
for (KafkaTopicPartition seedPartition : allPartitions) {
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we
// subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffsets.put(
seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific
// offsets
// do not contain a value for this partition
subscribedPartitionsToStartOffsets.put(
seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
case TIMESTAMP:
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to "
+ StartupMode.TIMESTAMP
+ ", but no startup timestamp was specified.");
}
for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
.entrySet()) {
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the
// given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to
// read, we subtract
// it by one so that the initial state of the consumer will
// be correct
: partitionToOffset.getValue() - 1);
}
break;
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(
seedPartition, startupMode.getStateSentinel());
}
}
if (!subscribedPartitionsToStartOffsets.isEmpty()) {
switch (startupMode) {
case EARLIEST:
LOG.info(
"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case LATEST:
LOG.info(
"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case TIMESTAMP:
LOG.info(
"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
startupOffsetsTimestamp,
subscribedPartitionsToStartOffsets.keySet());
break;
case SPECIFIC_OFFSETS:
LOG.info(
"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
new ArrayList<>(subscribedPartitionsToStartOffsets.size());
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
subscribedPartitionsToStartOffsets.entrySet()) {
if (subscribedPartition.getValue()
== KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}
if (partitionsDefaultedToGroupOffsets.size() > 0) {
LOG.warn(
"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"
+ "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
break;
case GROUP_OFFSETS:
LOG.info(
"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
} else {
LOG.info(
"Consumer subtask {} initially has no partitions to read from.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
this.deserializer.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
}