in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java [54:88]
public PollingRecordPublisher create(
final StartingPosition startingPosition,
final Properties consumerConfig,
final MetricGroup metricGroup,
final StreamShardHandle streamShardHandle)
throws InterruptedException {
Preconditions.checkNotNull(startingPosition);
Preconditions.checkNotNull(consumerConfig);
Preconditions.checkNotNull(metricGroup);
Preconditions.checkNotNull(streamShardHandle);
final PollingRecordPublisherConfiguration configuration =
new PollingRecordPublisherConfiguration(consumerConfig);
final PollingRecordPublisherMetricsReporter metricsReporter =
new PollingRecordPublisherMetricsReporter(metricGroup);
final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);
if (configuration.isAdaptiveReads()) {
return new AdaptivePollingRecordPublisher(
startingPosition,
streamShardHandle,
metricsReporter,
kinesisProxy,
configuration.getMaxNumberOfRecordsPerFetch(),
configuration.getFetchIntervalMillis());
} else {
return new PollingRecordPublisher(
startingPosition,
streamShardHandle,
metricsReporter,
kinesisProxy,
configuration.getMaxNumberOfRecordsPerFetch(),
configuration.getFetchIntervalMillis());
}
}