protected KinesisDataFetcher()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/KinesisDataFetcher.java [398:442]


	protected KinesisDataFetcher(
			final List<String> streams,
			final SourceFunction.SourceContext<T> sourceContext,
			final Object checkpointLock,
			final RuntimeContext runtimeContext,
			final Properties configProps,
			final KinesisDeserializationSchema<T> deserializationSchema,
			final KinesisShardAssigner shardAssigner,
			final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
			final WatermarkTracker watermarkTracker,
			final AtomicReference<Throwable> error,
			final List<KinesisStreamShardState> subscribedShardsState,
			final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
			final FlinkKinesisProxyFactory kinesisProxyFactory,
			@Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
		this.streams = checkNotNull(streams);
		this.configProps = checkNotNull(configProps);
		this.sourceContext = checkNotNull(sourceContext);
		this.checkpointLock = checkNotNull(checkpointLock);
		this.runtimeContext = checkNotNull(runtimeContext);
		this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
		this.deserializationSchema = checkNotNull(deserializationSchema);
		this.shardAssigner = checkNotNull(shardAssigner);
		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
		this.watermarkTracker = watermarkTracker;
		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
		this.kinesisProxyV2Factory = kinesisProxyV2Factory;
		this.kinesis = kinesisProxyFactory.create(configProps);
		this.recordPublisherFactory = createRecordPublisherFactory();

		this.consumerMetricGroup = runtimeContext.getMetricGroup()
				.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);

		this.error = checkNotNull(error);
		this.subscribedShardsState = checkNotNull(subscribedShardsState);
		this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);

		this.shardConsumersExecutor =
				createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());

		this.recordEmitter = createRecordEmitter(configProps);

		StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(configProps, streams);
	}