in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [380:427]
protected KinesisDataFetcher(
final List<String> streams,
final SourceFunction.SourceContext<T> sourceContext,
final Object checkpointLock,
final RuntimeContext runtimeContext,
final Properties configProps,
final KinesisDeserializationSchema<T> deserializationSchema,
final KinesisShardAssigner shardAssigner,
final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
final WatermarkTracker watermarkTracker,
final AtomicReference<Throwable> error,
final List<KinesisStreamShardState> subscribedShardsState,
final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
final FlinkKinesisProxyFactory kinesisProxyFactory,
@Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
this.streams = checkNotNull(streams);
this.configProps = checkNotNull(configProps);
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = checkNotNull(checkpointLock);
this.runtimeContext = checkNotNull(runtimeContext);
this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = checkNotNull(deserializationSchema);
this.shardAssigner = checkNotNull(shardAssigner);
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesisProxyV2Factory = kinesisProxyV2Factory;
this.kinesis = kinesisProxyFactory.create(configProps);
this.recordPublisherFactory = createRecordPublisherFactory();
this.consumerMetricGroup =
runtimeContext
.getMetricGroup()
.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
this.error = checkNotNull(error);
this.subscribedShardsState = checkNotNull(subscribedShardsState);
this.subscribedStreamsToLastDiscoveredShardIds =
checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
this.shardConsumersExecutor =
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
this.recordEmitter = createRecordEmitter(configProps);
StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(configProps, streams);
}