in amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java [115:181]
public TaskResult call() {
/**
* NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
* which is the app-level MillsBehindLatest metric.
*/
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(shardScope, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
try {
shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
Exception exception = null;
try {
if (processRecordsInput.millisBehindLatest() != null) {
shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
}
if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) {
log.info("Reached end of shard {} and have no records to process", shardInfoId);
return new TaskResult(null, true);
}
throttlingReporter.success();
List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records());
if (schemaRegistryDecoder != null) {
records = schemaRegistryDecoder.decode(records);
}
if (!records.isEmpty()) {
shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
}
recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(),
recordProcessorCheckpointer.largestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) {
callProcessRecords(processRecordsInput, records);
}
success = true;
} catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfoId, e);
exception = e;
backoff();
}
if (processRecordsInput.isAtShardEnd()) {
log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size());
return new TaskResult(null, true);
}
return new TaskResult(exception);
} finally {
MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(shardScope);
MetricsUtil.endScope(appScope);
}
}