in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/PubSubSource.java [143:164]
public SourceReader<OutputT, SubscriptionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
PubSubDeserializationSchema<OutputT> schema = deserializationSchema();
schema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup();
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return null;
}
});
return new PubSubSourceReader<>(
schema,
new PubSubAckTracker(),
this::createSplitReader,
new Configuration(),
readerContext);
}