private List list()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java [362:424]


    private List<Lease> list(Integer limit, Integer maxPages, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
            ProvisionedThroughputException {

        log.debug("Listing leases from table {}", table);

        ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);

        if (streamIdentifier != null) {
            final Map<String, AttributeValue> expressionAttributeValues = ImmutableMap.of(
                 DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
            );
            scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
                    .expressionAttributeValues(expressionAttributeValues);
        }

        if (limit != null) {
            scanRequestBuilder = scanRequestBuilder.limit(limit);
        }
        ScanRequest scanRequest = scanRequestBuilder.build();

        final AWSExceptionManager exceptionManager = createExceptionManager();
        exceptionManager.add(ResourceNotFoundException.class, t ->  t);
        exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t);

        try {
            try {
                ScanResponse scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
                List<Lease> result = new ArrayList<>();

                while (scanResult != null) {
                    for (Map<String, AttributeValue> item : scanResult.items()) {
                        log.debug("Got item {} from DynamoDB.", item.toString());
                        result.add(serializer.fromDynamoRecord(item));
                    }

                    Map<String, AttributeValue> lastEvaluatedKey = scanResult.lastEvaluatedKey();
                    if (CollectionUtils.isNullOrEmpty(lastEvaluatedKey) || --maxPages <= 0) {
                        // Signify that we're done.
                        scanResult = null;
                        log.debug("lastEvaluatedKey was null - scan finished.");
                    } else {
                        // Make another request, picking up where we left off.
                        scanRequest = scanRequest.toBuilder().exclusiveStartKey(lastEvaluatedKey).build();
                        log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey);
                        scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
                    }
                }
                log.debug("Listed {} leases from table {}", result.size(), table);
                return result;
            } catch (ExecutionException e) {
                throw exceptionManager.apply(e.getCause());
            } catch (InterruptedException e) {
                // TODO: Check if this is the correct behavior
                throw new DependencyException(e);
            }
        } catch (ResourceNotFoundException e) {
            throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
        } catch (ProvisionedThroughputExceededException e) {
            throw new ProvisionedThroughputException(e);
        } catch (DynamoDbException | TimeoutException e) {
            throw new DependencyException(e);
        }
    }