public Iterator call()

in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/emr/src/main/java/com/amazon/aws/blog/function/PreFetcherFlatMapFunction.java [85:148]


    public Iterator<String> call(final Iterator<String> orderIdIterator) throws Exception {
        final AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.standard().build();
        final DynamoDBMapper ddbMapper = new DynamoDBMapper(ddbClient);
        final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_WEST_2).build();

        final Set<HostAndPort> jedisClusterNodes = new HashSet<>();
        final HostAndPort hostAndPort = getElastiCacheAddress();
        jedisClusterNodes.add(hostAndPort);
        final JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes,
                DefaultJedisClientConfig.builder().ssl(true).build(), DEFAULT_REDIRECTIONS, new GenericObjectPoolConfig<>());

        final List<String> output = new LinkedList<>();
        final List<Future<String>> futures = new LinkedList<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(32);

        while (orderIdIterator.hasNext()) {
            final String orderId = orderIdIterator.next();

            // Load stateful artifacts in parallel by using multiple threads
            Callable<String> callable = () -> {
                final StatefulArtifactIndex index = ddbMapper.load(StatefulArtifactIndex.class, orderId);
                if (index == null) {
                    // There was no pre-existing stateful artifact
                    return null;
                }

                final String file = index.getFile();
                final Long offset = Long.valueOf(index.getByteOffset());
                final Long size = Long.valueOf(index.getByteSize());
                final String cachedContent = jedisCluster.get(getElastiCacheKey(file, offset, size));
                if (cachedContent == null) {
                    // Cache miss so we must default back to the backing S3 storage
                    // S3 objects can be retrieved using the partial seek feature
                    final GetObjectRequest objectRequest = new GetObjectRequest(S3_BUCKET_PREFIX + this.awsAccount, file)
                            .withRange(offset, offset + size - 1);
                    final S3Object s3Object = s3.getObject(objectRequest);

                    // Adds the retrieved stateful artifact to ElastiCache before returning
                    final String fetchedArtifact = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8);
                    jedisCluster.set(getElastiCacheKey(file, offset, size), fetchedArtifact);
                    return fetchedArtifact;
                } else {
                    // Cache hit so we can return the value in ElastiCache
                    return cachedContent;
                }
            };
            futures.add(executorService.submit(callable));
        }

        executorService.shutdown();

        if (!executorService.awaitTermination(24, TimeUnit.HOURS)) {
            return null;
        }

        for (final Future<String> future : futures) {
            final String existingMatchSet = future.get();
            if (existingMatchSet != null){
                output.add(existingMatchSet);
            }
        }

        return output.iterator();
    }