public void run()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java [110:164]


    public void run() {
        try {
            while (isRunning()) {
                final RecordPublisherRunResult result =
                        recordPublisher.run(
                                batch -> {
                                    if (!batch.getDeaggregatedRecords().isEmpty()) {
                                        LOG.debug(
                                                "stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
                                                subscribedShard.getStreamName(),
                                                subscribedShard.getShard().getShardId(),
                                                batch.getMillisBehindLatest(),
                                                batch.getDeaggregatedRecordSize());
                                    }
                                    for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
                                        if (filterDeaggregatedRecord(userRecord)) {
                                            deserializeRecordForCollectionAndUpdateState(
                                                    userRecord);
                                        }
                                    }

                                    shardConsumerMetricsReporter.setAverageRecordSizeBytes(
                                            batch.getAverageRecordSizeBytes());
                                    shardConsumerMetricsReporter.setNumberOfAggregatedRecords(
                                            batch.getAggregatedRecordSize());
                                    shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(
                                            batch.getDeaggregatedRecordSize());
                                    ofNullable(batch.getMillisBehindLatest())
                                            .ifPresent(
                                                    shardConsumerMetricsReporter
                                                            ::setMillisBehindLatest);

                                    return lastSequenceNum;
                                });

                if (result == COMPLETE) {
                    fetcherRef.updateState(
                            subscribedShardStateIndex,
                            SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
                    // we can close this consumer thread once we've reached the end of the
                    // subscribed shard
                    break;
                } else if (isRunning() && result == CANCELLED) {
                    final String errorMessage =
                            "Shard consumer cancelled: " + subscribedShard.getShard().getShardId();
                    LOG.info(errorMessage);
                    throw new ShardConsumerCancelledException(errorMessage);
                }
            }
        } catch (Throwable t) {
            fetcherRef.stopWithError(t);
        } finally {
            this.shardConsumerMetricsReporter.unregister();
        }
    }