in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java [118:145]
public KafkaSourceEnumerator(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness,
KafkaSourceEnumState kafkaSourceEnumState) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.newDiscoveryOffsetsInitializer = OffsetsInitializer.earliest();
this.properties = properties;
this.context = context;
this.boundedness = boundedness;
this.assignedPartitions = new HashSet<>(kafkaSourceEnumState.assignedPartitions());
this.pendingPartitionSplitAssignment = new HashMap<>();
this.partitionDiscoveryIntervalMs =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
Long::parseLong);
this.consumerGroupId = properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
this.unassignedInitialPartitions =
new HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
this.initialDiscoveryFinished = kafkaSourceEnumState.initialDiscoveryFinished();
}