in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java [60:94]
public KafkaShuffleFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics,
TypeSerializer<T> typeSerializer,
int producerParallelism)
throws Exception {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
taskNameWithSubtasks,
deserializer,
kafkaProperties,
pollTimeout,
subtaskMetricGroup,
consumerMetricGroup,
useMetrics);
this.kafkaShuffleDeserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
this.watermarkHandler = new WatermarkHandler(producerParallelism);
}