public SourceReader createReader()

in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/PubSubSource.java [143:164]


  public SourceReader<OutputT, SubscriptionSplit> createReader(SourceReaderContext readerContext)
      throws Exception {
    PubSubDeserializationSchema<OutputT> schema = deserializationSchema();
    schema.open(
        new DeserializationSchema.InitializationContext() {
          @Override
          public MetricGroup getMetricGroup() {
            return readerContext.metricGroup();
          }

          @Override
          public UserCodeClassLoader getUserCodeClassLoader() {
            return null;
          }
        });
    return new PubSubSourceReader<>(
        schema,
        new PubSubAckTracker(),
        this::createSplitReader,
        new Configuration(),
        readerContext);
  }