in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/emr/src/main/java/com/amazon/aws/blog/function/PreFetcherFlatMapFunction.java [85:148]
public Iterator<String> call(final Iterator<String> orderIdIterator) throws Exception {
final AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.standard().build();
final DynamoDBMapper ddbMapper = new DynamoDBMapper(ddbClient);
final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_WEST_2).build();
final Set<HostAndPort> jedisClusterNodes = new HashSet<>();
final HostAndPort hostAndPort = getElastiCacheAddress();
jedisClusterNodes.add(hostAndPort);
final JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes,
DefaultJedisClientConfig.builder().ssl(true).build(), DEFAULT_REDIRECTIONS, new GenericObjectPoolConfig<>());
final List<String> output = new LinkedList<>();
final List<Future<String>> futures = new LinkedList<>();
final ExecutorService executorService = Executors.newFixedThreadPool(32);
while (orderIdIterator.hasNext()) {
final String orderId = orderIdIterator.next();
// Load stateful artifacts in parallel by using multiple threads
Callable<String> callable = () -> {
final StatefulArtifactIndex index = ddbMapper.load(StatefulArtifactIndex.class, orderId);
if (index == null) {
// There was no pre-existing stateful artifact
return null;
}
final String file = index.getFile();
final Long offset = Long.valueOf(index.getByteOffset());
final Long size = Long.valueOf(index.getByteSize());
final String cachedContent = jedisCluster.get(getElastiCacheKey(file, offset, size));
if (cachedContent == null) {
// Cache miss so we must default back to the backing S3 storage
// S3 objects can be retrieved using the partial seek feature
final GetObjectRequest objectRequest = new GetObjectRequest(S3_BUCKET_PREFIX + this.awsAccount, file)
.withRange(offset, offset + size - 1);
final S3Object s3Object = s3.getObject(objectRequest);
// Adds the retrieved stateful artifact to ElastiCache before returning
final String fetchedArtifact = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8);
jedisCluster.set(getElastiCacheKey(file, offset, size), fetchedArtifact);
return fetchedArtifact;
} else {
// Cache hit so we can return the value in ElastiCache
return cachedContent;
}
};
futures.add(executorService.submit(callable));
}
executorService.shutdown();
if (!executorService.awaitTermination(24, TimeUnit.HOURS)) {
return null;
}
for (final Future<String> future : futures) {
final String existingMatchSet = future.get();
if (existingMatchSet != null){
output.add(existingMatchSet);
}
}
return output.iterator();
}