in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumer.java [109:147]
public void run() {
try {
while (isRunning()) {
final RecordPublisherRunResult result = recordPublisher.run(batch -> {
if (!batch.getDeaggregatedRecords().isEmpty()) {
LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
subscribedShard.getStreamName(), subscribedShard.getShard().getShardId(),
batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize());
}
for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
if (filterDeaggregatedRecord(userRecord)) {
deserializeRecordForCollectionAndUpdateState(userRecord);
}
}
shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
return lastSequenceNum;
});
if (result == COMPLETE) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else if (result == CANCELLED) {
final String errorMessage = "Shard consumer cancelled: " + subscribedShard.getShard().getShardId();
LOG.info(errorMessage);
throw new ShardConsumerCancelledException(errorMessage);
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
} finally {
this.shardConsumerMetricsReporter.unregister();
}
}