protected KinesisDataFetcher()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [380:427]


    protected KinesisDataFetcher(
            final List<String> streams,
            final SourceFunction.SourceContext<T> sourceContext,
            final Object checkpointLock,
            final RuntimeContext runtimeContext,
            final Properties configProps,
            final KinesisDeserializationSchema<T> deserializationSchema,
            final KinesisShardAssigner shardAssigner,
            final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
            final WatermarkTracker watermarkTracker,
            final AtomicReference<Throwable> error,
            final List<KinesisStreamShardState> subscribedShardsState,
            final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
            final FlinkKinesisProxyFactory kinesisProxyFactory,
            @Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
        this.streams = checkNotNull(streams);
        this.configProps = checkNotNull(configProps);
        this.sourceContext = checkNotNull(sourceContext);
        this.checkpointLock = checkNotNull(checkpointLock);
        this.runtimeContext = checkNotNull(runtimeContext);
        this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
        this.deserializationSchema = checkNotNull(deserializationSchema);
        this.shardAssigner = checkNotNull(shardAssigner);
        this.periodicWatermarkAssigner = periodicWatermarkAssigner;
        this.watermarkTracker = watermarkTracker;
        this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
        this.kinesisProxyV2Factory = kinesisProxyV2Factory;
        this.kinesis = kinesisProxyFactory.create(configProps);
        this.recordPublisherFactory = createRecordPublisherFactory();

        this.consumerMetricGroup =
                runtimeContext
                        .getMetricGroup()
                        .addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);

        this.error = checkNotNull(error);
        this.subscribedShardsState = checkNotNull(subscribedShardsState);
        this.subscribedStreamsToLastDiscoveredShardIds =
                checkNotNull(subscribedStreamsToLastDiscoveredShardIds);

        this.shardConsumersExecutor =
                createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());

        this.recordEmitter = createRecordEmitter(configProps);

        StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(configProps, streams);
    }