in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java [362:424]
private List<Lease> list(Integer limit, Integer maxPages, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
log.debug("Listing leases from table {}", table);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
if (streamIdentifier != null) {
final Map<String, AttributeValue> expressionAttributeValues = ImmutableMap.of(
DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
);
scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
.expressionAttributeValues(expressionAttributeValues);
}
if (limit != null) {
scanRequestBuilder = scanRequestBuilder.limit(limit);
}
ScanRequest scanRequest = scanRequestBuilder.build();
final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t);
try {
try {
ScanResponse scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
List<Lease> result = new ArrayList<>();
while (scanResult != null) {
for (Map<String, AttributeValue> item : scanResult.items()) {
log.debug("Got item {} from DynamoDB.", item.toString());
result.add(serializer.fromDynamoRecord(item));
}
Map<String, AttributeValue> lastEvaluatedKey = scanResult.lastEvaluatedKey();
if (CollectionUtils.isNullOrEmpty(lastEvaluatedKey) || --maxPages <= 0) {
// Signify that we're done.
scanResult = null;
log.debug("lastEvaluatedKey was null - scan finished.");
} else {
// Make another request, picking up where we left off.
scanRequest = scanRequest.toBuilder().exclusiveStartKey(lastEvaluatedKey).build();
log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey);
scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
}
}
log.debug("Listed {} leases from table {}", result.size(), table);
return result;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check if this is the correct behavior
throw new DependencyException(e);
}
} catch (ResourceNotFoundException e) {
throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
} catch (ProvisionedThroughputExceededException e) {
throw new ProvisionedThroughputException(e);
} catch (DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
}