public TaskResult call()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java [115:181]


    public TaskResult call() {
        /**
         * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
         * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
         * which is the app-level MillsBehindLatest metric.
         */
        final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
        final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
        shardInfo.streamIdentifierSerOpt()
                .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
        MetricsUtil.addShardId(shardScope, shardInfo.shardId());
        long startTimeMillis = System.currentTimeMillis();
        boolean success = false;
        try {
            shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
            Exception exception = null;

            try {
                if (processRecordsInput.millisBehindLatest() != null) {
                    shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
                            StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
                    appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
                            StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
                }

                if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) {
                    log.info("Reached end of shard {} and have no records to process", shardInfoId);
                    return new TaskResult(null, true);
                }

                throttlingReporter.success();
                List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records());

                if (schemaRegistryDecoder != null) {
                    records = schemaRegistryDecoder.decode(records);
                }

                if (!records.isEmpty()) {
                    shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
                }

                recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
                        shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(),
                        recordProcessorCheckpointer.largestPermittedCheckpointValue()));

                if (shouldCallProcessRecords(records)) {
                    callProcessRecords(processRecordsInput, records);
                }
                success = true;
            } catch (RuntimeException e) {
                log.error("ShardId {}: Caught exception: ", shardInfoId, e);
                exception = e;
                backoff();
            }

            if (processRecordsInput.isAtShardEnd()) {
                log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size());
                return new TaskResult(null, true);
            }
            return new TaskResult(exception);
        } finally {
            MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(shardScope);
            MetricsUtil.endScope(appScope);
        }
    }