in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java [110:164]
public void run() {
try {
while (isRunning()) {
final RecordPublisherRunResult result =
recordPublisher.run(
batch -> {
if (!batch.getDeaggregatedRecords().isEmpty()) {
LOG.debug(
"stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
subscribedShard.getStreamName(),
subscribedShard.getShard().getShardId(),
batch.getMillisBehindLatest(),
batch.getDeaggregatedRecordSize());
}
for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
if (filterDeaggregatedRecord(userRecord)) {
deserializeRecordForCollectionAndUpdateState(
userRecord);
}
}
shardConsumerMetricsReporter.setAverageRecordSizeBytes(
batch.getAverageRecordSizeBytes());
shardConsumerMetricsReporter.setNumberOfAggregatedRecords(
batch.getAggregatedRecordSize());
shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(
batch.getDeaggregatedRecordSize());
ofNullable(batch.getMillisBehindLatest())
.ifPresent(
shardConsumerMetricsReporter
::setMillisBehindLatest);
return lastSequenceNum;
});
if (result == COMPLETE) {
fetcherRef.updateState(
subscribedShardStateIndex,
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the
// subscribed shard
break;
} else if (isRunning() && result == CANCELLED) {
final String errorMessage =
"Shard consumer cancelled: " + subscribedShard.getShard().getShardId();
LOG.info(errorMessage);
throw new ShardConsumerCancelledException(errorMessage);
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
} finally {
this.shardConsumerMetricsReporter.unregister();
}
}