flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [571:587]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                KinesisDeserializationSchema<T> shardDeserializationSchema =
                        getClonedDeserializationSchema();
                shardDeserializationSchema.open(
                        RuntimeContextInitializationContextAdapters.deserializationAdapter(
                                runtimeContext,
                                // ignore the provided metric group
                                metricGroup ->
                                        consumerMetricGroup
                                                .addGroup(
                                                        "subtaskId",
                                                        String.valueOf(indexOfThisConsumerSubtask))
                                                .addGroup(
                                                        "shardId",
                                                        streamShardHandle.getShard().getShardId())
                                                .addGroup("user")));
                shardConsumersExecutor.submit(
                        createShardConsumer(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [718:734]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                KinesisDeserializationSchema<T> shardDeserializationSchema =
                        getClonedDeserializationSchema();
                shardDeserializationSchema.open(
                        RuntimeContextInitializationContextAdapters.deserializationAdapter(
                                runtimeContext,
                                // ignore the provided metric group
                                metricGroup ->
                                        consumerMetricGroup
                                                .addGroup(
                                                        "subtaskId",
                                                        String.valueOf(indexOfThisConsumerSubtask))
                                                .addGroup(
                                                        "shardId",
                                                        streamShardHandle.getShard().getShardId())
                                                .addGroup("user")));
                shardConsumersExecutor.submit(
                        createShardConsumer(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



