in src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java [99:123]
private Set<String> getAllShards(String leaseTable)
throws InterruptedException, ExecutionException, TimeoutException {
try {
ScanRequest scanRequest =
ScanRequest.builder()
.tableName(leaseTable)
.attributesToGet(LEASE_KEY_ATTRIBUTE_NAME)
.build();
ScanResponse scanResponse =
dynamoDbAsyncClient
.scan(scanRequest)
.get(DYNAMODB_RESPONSE_TIMEOUT_SECS, TimeUnit.SECONDS);
return scanResponse.items().stream()
.map(i -> i.get(LEASE_KEY_ATTRIBUTE_NAME).s())
.collect(Collectors.toSet());
} catch (Exception e) {
if (e.getCause() != null && e.getCause() instanceof ResourceNotFoundException) {
logger.atWarning().log(
"%s resetOffset: lease table does not exist, nothing to reset.", leaseTable);
return Collections.emptySet();
}
throw e;
}
}