public PollingRecordPublisher create()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java [54:88]


    public PollingRecordPublisher create(
            final StartingPosition startingPosition,
            final Properties consumerConfig,
            final MetricGroup metricGroup,
            final StreamShardHandle streamShardHandle)
            throws InterruptedException {
        Preconditions.checkNotNull(startingPosition);
        Preconditions.checkNotNull(consumerConfig);
        Preconditions.checkNotNull(metricGroup);
        Preconditions.checkNotNull(streamShardHandle);

        final PollingRecordPublisherConfiguration configuration =
                new PollingRecordPublisherConfiguration(consumerConfig);
        final PollingRecordPublisherMetricsReporter metricsReporter =
                new PollingRecordPublisherMetricsReporter(metricGroup);
        final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);

        if (configuration.isAdaptiveReads()) {
            return new AdaptivePollingRecordPublisher(
                    startingPosition,
                    streamShardHandle,
                    metricsReporter,
                    kinesisProxy,
                    configuration.getMaxNumberOfRecordsPerFetch(),
                    configuration.getFetchIntervalMillis());
        } else {
            return new PollingRecordPublisher(
                    startingPosition,
                    streamShardHandle,
                    metricsReporter,
                    kinesisProxy,
                    configuration.getMaxNumberOfRecordsPerFetch(),
                    configuration.getFetchIntervalMillis());
        }
    }