public void run()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumer.java [109:147]


	public void run() {
		try {
			while (isRunning()) {
				final RecordPublisherRunResult result = recordPublisher.run(batch -> {
					if (!batch.getDeaggregatedRecords().isEmpty()) {
						LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
							subscribedShard.getStreamName(), subscribedShard.getShard().getShardId(),
							batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize());
					}
					for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
						if (filterDeaggregatedRecord(userRecord)) {
							deserializeRecordForCollectionAndUpdateState(userRecord);
						}
					}

					shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
					shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
					shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
					ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);

					return lastSequenceNum;
				});

				if (result == COMPLETE) {
					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
					// we can close this consumer thread once we've reached the end of the subscribed shard
					break;
				} else if (result == CANCELLED) {
					final String errorMessage = "Shard consumer cancelled: " + subscribedShard.getShard().getShardId();
					LOG.info(errorMessage);
					throw new ShardConsumerCancelledException(errorMessage);
				}
			}
		} catch (Throwable t) {
			fetcherRef.stopWithError(t);
		} finally {
			this.shardConsumerMetricsReporter.unregister();
		}
	}